Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add separate flood stage limit for frozen #71855

Merged
14 changes: 14 additions & 0 deletions docs/reference/modules/cluster/disk_allocator.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ PUT /my-index-000001/_settings
--
// end::cluster-routing-flood-stage-tag[]

[[cluster-routing-flood-stage-frozen]]
// tag::cluster-routing-flood-stage-tag[]
`cluster.routing.allocation.disk.watermark.flood_stage.frozen` {ess-icon}::
(<<dynamic-cluster-setting,Dynamic>>)
Controls the flood stage watermark for dedicated frozen nodes, which defaults to
95%.

`cluster.routing.allocation.disk.watermark.flood_stage.frozen.max_headroom` {ess-icon}::
(<<dynamic-cluster-setting,Dynamic>>)
Controls the max headroom for the flood stage watermark for dedicated frozen
nodes. Defaults to 20GB when
`cluster.routing.allocation.disk.watermark.flood_stage.frozen` is not explicitly
set. This caps the amount of free space required on dedicated frozen nodes.

`cluster.info.update.interval`::
(<<dynamic-cluster-setting,Dynamic>>)
How often {es} should check on disk usage for each node in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;

import java.util.ArrayList;
Expand Down Expand Up @@ -136,13 +137,20 @@ public void onNewInfo(ClusterInfo info) {
final DiskUsage usage = entry.value;
final RoutingNode routingNode = routingNodes.node(node);

if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
if (isFrozenOnlyNode(routingNode)) {
if (isFrozenOnlyNode(routingNode)) {
ByteSizeValue total = ByteSizeValue.ofBytes(usage.getTotalBytes());
long frozenFloodStageThreshold = diskThresholdSettings.getFreeBytesThresholdFrozenFloodStage(total).getBytes();
if (usage.getFreeBytes() < frozenFloodStageThreshold) {
logger.warn("flood stage disk watermark [{}] exceeded on {}",
diskThresholdSettings.describeFloodStageThreshold(), usage);
continue;
diskThresholdSettings.describeFrozenFloodStageThreshold(total), usage);
}
// skip checking high/low watermarks for frozen nodes, since frozen shards have only insignificant local storage footprint
// and this allows us to use more of the local storage for cache.
continue;
}

if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {

nodesOverLowThreshold.add(node);
nodesOverHighThreshold.add(node);
Expand All @@ -162,12 +170,6 @@ public void onNewInfo(ClusterInfo info) {
continue;
}

if (isFrozenOnlyNode(routingNode)) {
// skip checking high/low watermarks for frozen nodes, since frozen shards have only insignificant local storage footprint
// and this allows us to use more of the local storage for cache.
continue;
}

if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RatioValue;
import org.elasticsearch.common.unit.RelativeByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Iterator;
Expand Down Expand Up @@ -45,6 +46,21 @@ public class DiskThresholdSettings {
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.flood_stage"),
new FloodStageValidator(),
Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<RelativeByteSizeValue> CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING =
new Setting<>("cluster.routing.allocation.disk.watermark.flood_stage.frozen", "95%",
(s) -> RelativeByteSizeValue.parseRelativeByteSizeValue(s, "cluster.routing.allocation.disk.watermark.flood_stage.frozen"),
Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<ByteSizeValue> CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING =
new Setting<>("cluster.routing.allocation.disk.watermark.flood_stage.frozen.max_headroom",
(settings) -> {
if (CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING.exists(settings)) {
return "-1";
} else {
return "20GB";
}
},
(s) -> ByteSizeValue.parseBytesSizeValue(s, "cluster.routing.allocation.disk.watermark.flood_stage.frozen.max_headroom"),
Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING =
Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic, Setting.Property.NodeScope);
Expand All @@ -59,6 +75,8 @@ public class DiskThresholdSettings {
private volatile TimeValue rerouteInterval;
private volatile Double freeDiskThresholdFloodStage;
private volatile ByteSizeValue freeBytesThresholdFloodStage;
private volatile RelativeByteSizeValue frozenFloodStage;
private volatile ByteSizeValue frozenFloodStageMaxHeadroom;

static {
assert Version.CURRENT.major == Version.V_7_0_0.major + 1; // this check is unnecessary in v9
Expand All @@ -69,18 +87,24 @@ public class DiskThresholdSettings {
}
}


public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings);
final String floodStage = CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.get(settings);
setHighWatermark(highWatermark);
setLowWatermark(lowWatermark);
setFloodStage(floodStage);
setFrozenFloodStage(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING.get(settings));
setFrozenFloodStageMaxHeadroom(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING.get(settings));
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStage);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING, this::setFrozenFloodStage);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING,
this::setFrozenFloodStageMaxHeadroom);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
}
Expand Down Expand Up @@ -242,6 +266,15 @@ private void setFloodStage(String floodStageRaw) {
CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey());
}

private void setFrozenFloodStage(RelativeByteSizeValue floodStage) {
this.frozenFloodStage = floodStage;
}

private void setFrozenFloodStageMaxHeadroom(ByteSizeValue maxHeadroom) {
this.frozenFloodStageMaxHeadroom = maxHeadroom;
}


/**
* Gets the raw (uninterpreted) low watermark value as found in the settings.
*/
Expand Down Expand Up @@ -280,6 +313,14 @@ public ByteSizeValue getFreeBytesThresholdFloodStage() {
return freeBytesThresholdFloodStage;
}

public ByteSizeValue getFreeBytesThresholdFrozenFloodStage(ByteSizeValue total) {
// flood stage bytes are reversed compared to percentage, so we special handle it.
RelativeByteSizeValue frozenFloodStage = this.frozenFloodStage;
if (frozenFloodStage.isAbsolute()) {
return frozenFloodStage.getAbsolute();
}
return ByteSizeValue.ofBytes(total.getBytes() - frozenFloodStage.calculateValue(total, frozenFloodStageMaxHeadroom).getBytes());
}
public boolean isEnabled() {
return enabled;
}
Expand All @@ -306,6 +347,18 @@ String describeFloodStageThreshold() {
: freeBytesThresholdFloodStage.toString();
}

String describeFrozenFloodStageThreshold(ByteSizeValue total) {
ByteSizeValue maxHeadroom = this.frozenFloodStageMaxHeadroom;
RelativeByteSizeValue floodStage = this.frozenFloodStage;
if (floodStage.isAbsolute()) {
return floodStage.getStringRep();
} else if (floodStage.calculateValue(total, maxHeadroom).equals(floodStage.calculateValue(total, null))) {
return Strings.format1Decimals(floodStage.getRatio().getAsPercent(), "%");
} else {
return "max_headroom=" + maxHeadroom;
}
}

/**
* Attempts to parse the watermark into a percentage, returning 100.0% if
* it cannot be parsed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public void apply(Settings value, Settings current, Settings previous) {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.unit;

import org.elasticsearch.ElasticsearchParseException;

/**
* A byte size value that allows specification using either of:
* 1. Absolute value (200GB for instance)
* 2. Relative percentage value (95%)
* 3. Relative ratio value (0.95)
*/
public class RelativeByteSizeValue {

public static final String MAX_HEADROOM_PREFIX = "max_headroom=";
private final ByteSizeValue absolute;
private final RatioValue ratio;

public RelativeByteSizeValue(ByteSizeValue absolute) {
this.absolute = absolute;
this.ratio = null;
}

public RelativeByteSizeValue(RatioValue ratio) {
this.absolute = null;
this.ratio = ratio;
}

public boolean isAbsolute() {
return absolute != null;
}

public ByteSizeValue getAbsolute() {
return absolute;
}

public RatioValue getRatio() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire class and associated test is also in #71844, only difference is that this method was made public here.

return ratio;
}

/**
* Calculate the size to use, optionally catering for a max headroom.
* @param total the total size to use
* @param maxHeadroom the max headroom to cater for or null (or -1) to ignore.
* @return the size to use
*/
public ByteSizeValue calculateValue(ByteSizeValue total, ByteSizeValue maxHeadroom) {
if (ratio != null) {
long ratioBytes = (long) Math.ceil(ratio.getAsRatio() * total.getBytes());
if (maxHeadroom != null && maxHeadroom.getBytes() != -1) {
return ByteSizeValue.ofBytes(Math.max(ratioBytes, total.getBytes() - maxHeadroom.getBytes()));
} else {
return ByteSizeValue.ofBytes(ratioBytes);
}
} else {
return absolute;
}
}

public boolean isNonZeroSize() {
if (ratio != null) {
return ratio.getAsRatio() > 0.0d;
} else {
return absolute.getBytes() > 0;
}
}

public static RelativeByteSizeValue parseRelativeByteSizeValue(String value, String settingName) {
try {
RatioValue ratio = RatioValue.parseRatioValue(value);
if (ratio.getAsPercent() != 0.0d || value.endsWith("%")) {
return new RelativeByteSizeValue(ratio);
} else {
return new RelativeByteSizeValue(ByteSizeValue.ZERO);
}
} catch (ElasticsearchParseException e) {
// ignore, see if it parses as bytes
}
try {
return new RelativeByteSizeValue(ByteSizeValue.parseBytesSizeValue(value, settingName));
// todo: fix NumberFormatException case in ByteSizeValue.
} catch (NumberFormatException | ElasticsearchParseException e) {
throw new ElasticsearchParseException("unable to parse [{}={}] as either percentage or bytes", e,
settingName, value);
}
}

public String getStringRep() {
if (ratio != null) {
return ratio.toString();
} else {
return absolute.getStringRep();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.MockLogAppender;
Expand Down Expand Up @@ -450,7 +451,12 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste
final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
allDisksOkBuilder = ImmutableOpenMap.builder();
allDisksOkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(15, 100)));
allDisksOkBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", 100, between(15, 100)));
if (randomBoolean()) {
allDisksOkBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", 100, between(15, 100)));
} else {
allDisksOkBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", ByteSizeValue.ofGb(1000).getBytes(),
(randomBoolean() ? ByteSizeValue.ofGb(between(20, 1000)) : ByteSizeValue.ofGb(between(20, 50))).getBytes()));
}
final ImmutableOpenMap<String, DiskUsage> allDisksOk = allDisksOkBuilder.build();

final ImmutableOpenMap.Builder<String, DiskUsage> aboveLowWatermarkBuilder = ImmutableOpenMap.builder();
Expand All @@ -475,6 +481,13 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste
frozenAboveFloodStageWatermarkBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar", 100, between(0, 4)));
final ImmutableOpenMap<String, DiskUsage> frozenAboveFloodStageWatermark = frozenAboveFloodStageWatermarkBuilder.build();

final ImmutableOpenMap.Builder<String, DiskUsage> frozenAboveFloodStageMaxHeadroomBuilder = ImmutableOpenMap.builder();
// node1 is below low watermark, so no logging from it.
frozenAboveFloodStageMaxHeadroomBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(15, 100)));
frozenAboveFloodStageMaxHeadroomBuilder.put("frozen", new DiskUsage("frozen", "frozen", "/foo/bar",
ByteSizeValue.ofGb(1000).getBytes(), ByteSizeValue.ofGb(between(0, 19)).getBytes()));
final ImmutableOpenMap<String, DiskUsage> frozenAboveFloodStageMaxHeadroom = frozenAboveFloodStageMaxHeadroomBuilder.build();

assertNoLogging(monitor, allDisksOk);

assertSingleInfoMessage(monitor, aboveLowWatermark,
Expand Down Expand Up @@ -546,6 +559,9 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste

assertRepeatedWarningMessages(monitor, frozenAboveFloodStageWatermark, "flood stage disk watermark [95%] exceeded on *frozen*");

assertRepeatedWarningMessages(monitor, frozenAboveFloodStageMaxHeadroom,
"flood stage disk watermark [max_headroom=20gb] exceeded on *frozen*");

assertNoLogging(monitor, allDisksOk);
}

Expand Down
Loading