Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid overshooting watermarks during relocation #46079

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -88,7 +87,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));
private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<>();

public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
Expand Down Expand Up @@ -275,6 +274,11 @@ private void maybeRefresh() {
}
}

// allow tests to adjust the node stats on receipt
List<NodeStats> adjustNodesStats(List<NodeStats> nodeStats) {
return nodeStats;
}

/**
* Refreshes the ClusterInfo in a blocking fashion
*/
Expand All @@ -284,12 +288,13 @@ public final ClusterInfo refresh() {
}
final CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodeStatses) {
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages = ImmutableOpenMap.builder();
fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages);
leastAvailableSpaceUsages = newLeastAvaiableUsages.build();
mostAvailableSpaceUsages = newMostAvaiableUsages.build();
public void onResponse(NodesStatsResponse nodesStatsResponse) {
ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsagesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsagesBuilder = ImmutableOpenMap.builder();
fillDiskUsagePerNode(logger, adjustNodesStats(nodesStatsResponse.getNodes()),
leastAvailableUsagesBuilder, mostAvailableUsagesBuilder);
leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build();
mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build();
}

@Override
Expand Down Expand Up @@ -402,7 +407,7 @@ static void fillDiskUsagePerNode(Logger logger, List<NodeStats> nodeStatsArray,
if (leastAvailablePath == null) {
assert mostAvailablePath == null;
mostAvailablePath = leastAvailablePath = info;
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()){
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) {
leastAvailablePath = info;
} else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) {
mostAvailablePath = info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public void onNewInfo(ClusterInfo info) {
.collect(Collectors.toSet());

if (indicesToAutoRelease.isEmpty() == false) {
logger.info("releasing read-only block on indices " + indicesToAutoRelease
+ " since they are now allocated to nodes with sufficient disk space");
updateIndicesReadOnly(indicesToAutoRelease, listener, false);
} else {
listener.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,36 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
boolean subtractShardsMovingAway, String dataPath) {
ClusterInfo clusterInfo = allocation.clusterInfo();
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (dataPath.equals(actualPath)) {
if (routing.initializing() && routing.relocatingNodeId() != null) {
totalSize += getExpectedShardSize(routing, allocation, 0);
} else if (subtractShardsMovingAway && routing.relocating()) {

for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
if (routing.relocatingNodeId() == null) {
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
// any additional space and can be ignored here
continue;
}

final String actualPath = clusterInfo.getDataPath(routing);
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
// free space
if (actualPath == null || actualPath.equals(dataPath)) {
totalSize += getExpectedShardSize(routing, allocation, 0);
}
}

if (subtractShardsMovingAway) {
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (actualPath == null) {
// we might know the path of this shard from before when it was relocating
actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
}
if (dataPath.equals(actualPath)) {
totalSize -= getExpectedShardSize(routing, allocation, 0);
}
}
}

return totalSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -1002,4 +1001,20 @@ public void logShardStates(ClusterState state) {
rn.shardsWithState(RELOCATING),
rn.shardsWithState(STARTED));
}

/**
* ClusterInfo that always reports /dev/null for the shards' data paths.
*/
static class DevNullClusterInfo extends ClusterInfo {
DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage,
ImmutableOpenMap<String, Long> shardSizes) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null);
}

@Override
public String getDataPath(ShardRouting shardRouting) {
return "/dev/null";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDeciderTests.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down
Loading