Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autoscaling during shrink #88292

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/88292.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 88292
summary: Autoscaling during shrink
area: Autoscaling
type: bug
issues:
- 85480
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

public class DiscoveryNodeFilters {

public static final Set<String> SINGLE_NODE_NAMES = Set.of("_id", "_name", "name");
static final Set<String> NON_ATTRIBUTE_NAMES = Set.of("_ip", "_host_ip", "_publish_ip", "host", "_id", "_name", "name");

public enum OpType {
Expand Down Expand Up @@ -234,6 +235,20 @@ public boolean isOnlyAttributeValueFilter() {
return filters.keySet().stream().anyMatch(NON_ATTRIBUTE_NAMES::contains) == false;
}

/**
* @return true if filter is for a single node
*/
public boolean isSingleNodeFilter() {
return withoutTierPreferences != null && withoutTierPreferences.isSingleNodeFilterInternal();
}

private boolean isSingleNodeFilterInternal() {
return (filters.size() == 1
&& NON_ATTRIBUTE_NAMES.contains(filters.keySet().iterator().next())
&& (filters.values().iterator().next().length == 1 || opType == OpType.AND))
|| (filters.size() > 1 && opType == OpType.AND && NON_ATTRIBUTE_NAMES.containsAll(filters.keySet()));
}

/**
* Generates a human-readable string for the DiscoverNodeFilters.
* Example: {@code _id:"id1 OR blah",name:"blah OR name2"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public void testScaleUp() throws IOException, InterruptedException {
response.results().get(policyName).requiredCapacity().total().storage().getBytes(),
Matchers.greaterThanOrEqualTo(enoughSpace + used)
);
assertThat(response.results().get(policyName).requiredCapacity().node().storage().getBytes(), Matchers.equalTo(maxShardSize));
assertThat(
response.results().get(policyName).requiredCapacity().node().storage().getBytes(),
Matchers.equalTo(maxShardSize + ReactiveStorageDeciderService.NODE_DISK_OVERHEAD + LOW_WATERMARK_BYTES)
);

// with 0 window, we expect just current.
putAutoscalingPolicy(
Expand All @@ -101,7 +104,10 @@ public void testScaleUp() throws IOException, InterruptedException {
assertThat(response.results().keySet(), Matchers.equalTo(Set.of(policyName)));
assertThat(response.results().get(policyName).currentCapacity().total().storage().getBytes(), Matchers.equalTo(enoughSpace));
assertThat(response.results().get(policyName).requiredCapacity().total().storage().getBytes(), Matchers.equalTo(enoughSpace));
assertThat(response.results().get(policyName).requiredCapacity().node().storage().getBytes(), Matchers.equalTo(maxShardSize));
assertThat(
response.results().get(policyName).requiredCapacity().node().storage().getBytes(),
Matchers.equalTo(maxShardSize + ReactiveStorageDeciderService.NODE_DISK_OVERHEAD + LOW_WATERMARK_BYTES)
);
}

private void putAutoscalingPolicy(String policyName, Settings settings) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand All @@ -32,6 +34,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -73,10 +76,33 @@

public class ReactiveStorageDeciderService implements AutoscalingDeciderService {
public static final String NAME = "reactive_storage";
/**
* An estimate of what space other things than accounted for by shard sizes in ClusterInfo use on disk.
* Set conservatively low for now.
*/
static final long NODE_DISK_OVERHEAD = ByteSizeValue.ofMb(10).getBytes();

private final DiskThresholdSettings diskThresholdSettings;
private final AllocationDeciders allocationDeciders;

private static final Predicate<String> REMOVE_NODE_LOCKED_FILTER_INITIAL = removeNodeLockedFilterPredicate(
IndexMetadata.INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING.getKey()
);

private static final Predicate<String> REMOVE_NODE_LOCKED_FILTER_REQUIRE = removeNodeLockedFilterPredicate(
IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey()
);

private static final Predicate<String> REMOVE_NODE_LOCKED_FILTER_INCLUDE = removeNodeLockedFilterPredicate(
IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey()
);

private static Predicate<String> removeNodeLockedFilterPredicate(String settingPrefix) {
return Predicate.not(
DiscoveryNodeFilters.SINGLE_NODE_NAMES.stream().map(settingPrefix::concat).collect(Collectors.toSet())::contains
);
}

public ReactiveStorageDeciderService(Settings settings, ClusterSettings clusterSettings, AllocationDeciders allocationDeciders) {
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.allocationDeciders = allocationDeciders;
Expand Down Expand Up @@ -116,13 +142,16 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
var unassignedBytesUnassignedShards = allocationState.storagePreventsAllocation();
long unassignedBytes = unassignedBytesUnassignedShards.sizeInBytes();
long maxShardSize = allocationState.maxShardSize();
long maxNodeLockedSize = allocationState.maxNodeLockedSize();
long minimumNodeSize = nodeSizeForDataBelowLowWatermark(Math.max(maxShardSize, maxNodeLockedSize), diskThresholdSettings)
+ NODE_DISK_OVERHEAD;
assert assignedBytes >= 0;
assert unassignedBytes >= 0;
assert maxShardSize >= 0;
String message = message(unassignedBytes, assignedBytes);
AutoscalingCapacity requiredCapacity = AutoscalingCapacity.builder()
.total(autoscalingCapacity.total().storage().getBytes() + unassignedBytes + assignedBytes, null, null)
.node(maxShardSize, null, null)
.node(minimumNodeSize, null, null)
.build();
return new AutoscalingDeciderResult(
requiredCapacity,
Expand Down Expand Up @@ -150,6 +179,10 @@ static boolean isDiskOnlyNoDecision(Decision decision) {
return singleNoDecision(decision, single -> true).map(DiskThresholdDecider.NAME::equals).orElse(false);
}

static boolean isResizeOnlyNoDecision(Decision decision) {
return singleNoDecision(decision, single -> true).map(ResizeAllocationDecider.NAME::equals).orElse(false);
}

static boolean isFilterTierOnlyDecision(Decision decision, IndexMetadata indexMetadata) {
// only primary shards are handled here, allowing us to disregard same shard allocation decider.
return singleNoDecision(decision, single -> SameShardAllocationDecider.NAME.equals(single.label()) == false).filter(
Expand Down Expand Up @@ -185,9 +218,24 @@ static Optional<String> singleNoDecision(Decision decision, Predicate<Decision>
}
}

static long nodeSizeForDataBelowLowWatermark(long bytes, DiskThresholdSettings thresholdSettings) {
ByteSizeValue bytesThreshold = thresholdSettings.getFreeBytesThresholdLow();
if (bytesThreshold.getBytes() != 0) {
return bytesThreshold.getBytes() + bytes;
} else {
double percentThreshold = thresholdSettings.getFreeDiskThresholdLow();
if (percentThreshold >= 0.0 && percentThreshold < 100.0) {
return (long) (bytes / ((100.0 - percentThreshold) / 100));
} else {
return bytes;
}
}
}

// todo: move this to top level class.
public static class AllocationState {
private final ClusterState state;
private final ClusterState originalState;
private final AllocationDeciders allocationDeciders;
private final DiskThresholdSettings diskThresholdSettings;
private final ClusterInfo info;
Expand Down Expand Up @@ -222,7 +270,8 @@ public static class AllocationState {
Set<DiscoveryNode> nodes,
Set<DiscoveryNodeRole> roles
) {
this.state = state;
this.state = removeNodeLockFilters(state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if removing the node lock filter in all cases couldn't lead to a different allocation outcome? Maybe we should only remove the initial_recovery setting here? I'm not 100% sure though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea I followed was that we should ensure that we have a node that is large enough to hold the node-locked data, hence the addition to ensure we deliver a proper node-level size.

With that done, we can assume that it is an allocation problem if it cannot fit. Hence it seems fair to remove the node locking here. I am aware that our allocation system is not yet sophisticated enough, but I'd rather not autoscale to a too large setup (since that may be multiple steps too large) in that case. For ILM controlled shrink, it will eventually fail and retry. Manual intervention may be necessary until our allocation system can handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thanks for clarifying.

this.originalState = state;
this.allocationDeciders = allocationDeciders;
this.diskThresholdSettings = diskThresholdSettings;
this.info = info;
Expand Down Expand Up @@ -324,8 +373,16 @@ private boolean cannotAllocateDueToStorage(ShardRouting shard, RoutingAllocation
// enable debug decisions to see all decisions and preserve the allocation decision label
allocation.debugDecision(true);
try {
return nodesInTier(allocation.routingNodes()).map(node -> allocationDeciders.canAllocate(shard, node, allocation))
.anyMatch(ReactiveStorageDeciderService::isDiskOnlyNoDecision);
boolean diskOnly = nodesInTier(allocation.routingNodes()).map(
node -> allocationDeciders.canAllocate(shard, node, allocation)
).anyMatch(ReactiveStorageDeciderService::isDiskOnlyNoDecision);
if (diskOnly && shard.unassigned() && shard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
// For resize shards only allow autoscaling if there is no other node where the shard could fit had it not been
// a resize shard. Notice that we already removed any initial_recovery filters.
diskOnly = nodesInTier(allocation.routingNodes()).map(node -> allocationDeciders.canAllocate(shard, node, allocation))
.anyMatch(ReactiveStorageDeciderService::isResizeOnlyNoDecision) == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test where there's enough space to hold the resize shard so we verify that we don't request more capacity in that case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find double negations a bit trappy 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed that.

}
return diskOnly;
} finally {
allocation.debugDecision(false);
}
Expand Down Expand Up @@ -395,16 +452,25 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {

private boolean isAssignedToTier(ShardRouting shard, RoutingAllocation allocation) {
IndexMetadata indexMetadata = indexMetadata(shard, allocation);
return DataTierAllocationDecider.shouldFilter(indexMetadata, roles, this::highestPreferenceTier, allocation) != Decision.NO;
return isAssignedToTier(indexMetadata, roles);
}

private static boolean isAssignedToTier(IndexMetadata indexMetadata, Set<DiscoveryNodeRole> roles) {
List<String> tierPreference = indexMetadata.getTierPreference();
return tierPreference.isEmpty() || DataTierAllocationDecider.allocationAllowed(highestPreferenceTier(tierPreference), roles);
}

private IndexMetadata indexMetadata(ShardRouting shard, RoutingAllocation allocation) {
return allocation.metadata().getIndexSafe(shard.index());
}

private Optional<String> highestPreferenceTier(List<String> preferredTiers, DiscoveryNodes unused, DesiredNodes desiredNodes) {
return Optional.of(highestPreferenceTier(preferredTiers));
}

private static String highestPreferenceTier(List<String> preferredTiers) {
assert preferredTiers.isEmpty() == false;
return Optional.of(preferredTiers.get(0));
return preferredTiers.get(0);
}

public long maxShardSize() {
Expand All @@ -414,6 +480,49 @@ public long maxShardSize() {
.orElse(0L);
}

public long maxNodeLockedSize() {
Metadata metadata = originalState.getMetadata();
return metadata.indices().values().stream().mapToLong(imd -> nodeLockedSize(imd, metadata)).max().orElse(0L);
}

private long nodeLockedSize(IndexMetadata indexMetadata, Metadata metadata) {
if (isNodeLocked(indexMetadata)) {
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexMetadata.getIndex());
long sum = 0;
for (int s = 0; s < indexMetadata.getNumberOfShards(); ++s) {
ShardRouting shard = indexRoutingTable.shard(s).primaryShard();
long size = sizeOf(shard);
sum += size;
}
if (indexMetadata.getResizeSourceIndex() != null) {
// since we only report the max size for an index, count a shrink/clone/split 2x if it is node locked.
sum = sum * 2;
}
return sum;
} else {
Index resizeSourceIndex = indexMetadata.getResizeSourceIndex();
if (resizeSourceIndex != null) {
IndexMetadata sourceIndexMetadata = metadata.getIndexSafe(resizeSourceIndex);
// ResizeAllocationDecider only handles clone or split, do the same here.

if (indexMetadata.getNumberOfShards() >= sourceIndexMetadata.getNumberOfShards()) {
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(resizeSourceIndex);
long max = 0;
for (int s = 0; s < sourceIndexMetadata.getNumberOfShards(); ++s) {
ShardRouting shard = indexRoutingTable.shard(s).primaryShard();
long size = sizeOf(shard);
max = Math.max(max, size);
}

// 2x to account for the extra copy residing on the same node
return max * 2;
}
}
}

return 0;
}

long sizeOf(ShardRouting shard) {
long expectedShardSize = getExpectedShardSize(shard);
if (expectedShardSize == 0L && shard.primary() == false) {
Expand Down Expand Up @@ -638,6 +747,48 @@ ClusterInfo info() {
return info;
}

private static ClusterState removeNodeLockFilters(ClusterState state) {
ClusterState.Builder builder = ClusterState.builder(state);
builder.metadata(removeNodeLockFilters(state.metadata()));
return builder.build();
}

private static Metadata removeNodeLockFilters(Metadata metadata) {
Metadata.Builder builder = Metadata.builder(metadata);
metadata.stream()
.filter(AllocationState::isNodeLocked)
.map(AllocationState::removeNodeLockFilters)
.forEach(imd -> builder.put(imd, false));
return builder.build();
}

private static IndexMetadata removeNodeLockFilters(IndexMetadata indexMetadata) {
Settings settings = indexMetadata.getSettings();
settings = removeNodeLockFilters(settings, REMOVE_NODE_LOCKED_FILTER_INITIAL, indexMetadata.getInitialRecoveryFilters());
settings = removeNodeLockFilters(settings, REMOVE_NODE_LOCKED_FILTER_REQUIRE, indexMetadata.requireFilters());
settings = removeNodeLockFilters(settings, REMOVE_NODE_LOCKED_FILTER_INCLUDE, indexMetadata.includeFilters());
return IndexMetadata.builder(indexMetadata).settings(settings).build();
}

private static Settings removeNodeLockFilters(Settings settings, Predicate<String> predicate, DiscoveryNodeFilters filters) {
// only filter if it is a single node filter - otherwise removing it risks narrowing legal nodes for OR filters.
if (filters != null && filters.isSingleNodeFilter()) {
return settings.filter(predicate);
} else {
return settings;
}
}

private static boolean isNodeLocked(IndexMetadata indexMetadata) {
return isNodeLocked(indexMetadata.requireFilters())
|| isNodeLocked(indexMetadata.includeFilters())
|| isNodeLocked(indexMetadata.getInitialRecoveryFilters());
}

private static boolean isNodeLocked(DiscoveryNodeFilters filters) {
return filters != null && filters.isSingleNodeFilter();
}

private static class ExtendedClusterInfo extends ClusterInfo {
private final ClusterInfo delegate;

Expand Down
Loading