-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize single node selecting in Shrink Action of ILM #76206
base: main
Are you sure you want to change the base?
Conversation
Pinging @elastic/es-core-features (Team:Core/Features) |
Thanks for opening this @gaobinlong, I think it's a good improvement for selecting a node. I will take a look and leave some review comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gaobinlong, I left a number of comments on this.
...ck/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java
Outdated
Show resolved
Hide resolved
...ck/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java
Outdated
Show resolved
Hide resolved
Arrays.stream(indexShardStats.getShards()).mapToLong(shardStats -> | ||
shardStats.getStats().getStore().getSizeInBytes()).sum()).sum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than do this for all shards, we can do something similar to:
indexShardStats.getPrimary().getStore().getSizeInBytes()
And then avoid the division below by the number of replicas (because we really only care about the size of the primary shards anyway):
Arrays.stream(indexShardStats.getShards()).mapToLong(shardStats -> | |
shardStats.getStats().getStore().getSizeInBytes()).sum()).sum(); | |
indexShardStats.getPrimary().getStore().getSizeInBytes()).sum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, getStore()
is @Nullable
, so there should be protection added to ensure it doesn't throw an NPE when it's null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that when we add index.routing.allocation.require._id
setting to the index, either primary shard or replica shard will be reallocated to the selected node, so we should care about the size of both primary shard and replica shard.
if (indexMetadata.getNumberOfReplicas() != 0) { | ||
indexPrimaryShardsStorageBytes /= indexMetadata.getNumberOfReplicas(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed if we use the primary stats above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calculation above is used to indicate "how much of the source index's storage" is available per node. As shrink works from both primary and replica I believe it's correct to include replicas in the math.
if (diskThresholdSettings.getFreeDiskThresholdLow() != 0) { | ||
freeBytesThresholdLow = (long) Math.ceil(nodeTotalBytes * | ||
diskThresholdSettings.getFreeDiskThresholdLow() * 0.01); | ||
} else { | ||
freeBytesThresholdLow = diskThresholdSettings.getFreeBytesThresholdLow().getBytes(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what reason you're doing this here, when constructing the DiskThresholdSettings
the bytes are always calculated (see the setLowWatermark(...)
call in the constructor), so I think you can always use getFreeBytesThresholdLow()
to get the number of bytes rather than doing a calculation with the percentage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By my test, diskThresholdSettings.getFreeBytesThresholdLow()
return 0 if we set the low watermark to a percentage and the converse is also true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gosh, so this is very confusing. @gaobinlong you're right about getFreeBytesThresholdLow
returning 0
when the watermark is configured using percentages. Like Lee, I've also been tripped by the setLowWatermark method in DiskThresholdSettings
.
For another PR - we should rename the methods used in setLowWatermark
to reflect that they only maybe return something. ie. thresholdPercentageFromWatermark
-> thresholdPercentageFromWatermarkIfPercentageConfigured
or maybeThresholdPercentageFromWatermark
Similar with thresholdBytesFromWatermark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract this math into a method in DiskThresholdSettings
? ie. getLowWatermarkAsBytes
or something named similar? (with corresponding unit tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andreidan, thanks for the suggestion, I've added some public methods in DiskThresholdSettings and added some unit tests.
...ck/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java
Outdated
Show resolved
Hide resolved
if (nodeAvailableBytes > freeBytesThresholdLow + 2 * indexPrimaryShardsStorageBytes - | ||
shardsOnCurrentNodeStorageBytes) { | ||
validRoutingNodes.add(node); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll need some sort of signalling here when there isn't enough space, so potentially capturing the event where none of the otherwise valid nodes can hold all the shards, and changing the Exception thrown in that case to include that message.
List<Map.Entry<String, Long>> nodeShardsStorageList = new ArrayList<>(nodeShardsStorageBytes.entrySet()); | ||
nodeShardsStorageList.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue())); | ||
Optional<String> nodeId = Optional.empty(); | ||
for (Map.Entry<String, Long> entry : nodeShardsStorageList) { | ||
// we prefer to select the node which contains the maximum shards storage bytes of the index from the valid node list | ||
if (validNodeIds.contains(entry.getKey())) { | ||
nodeId = Optional.of(entry.getKey()); | ||
break; | ||
} | ||
} | ||
|
||
// if we cannot find a node which contains any shard of the index, | ||
// shuffle the valid node list and select randomly | ||
if (nodeId.isEmpty()) { | ||
List<String> list = new ArrayList<>(validNodeIds); | ||
Randomness.shuffle(list); | ||
nodeId = list.stream().findAny(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be useful to factor this into a separate method and then make it unit testable, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea, I've changed the code yet.
listener.onFailure(new NoNodeAvailableException("could not find any nodes to allocate index [" + | ||
indexName + "] onto prior to shrink")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the exception that we should enhance if there is a not that would normally be valid, but failed because it doesn't have enough space to hold all the primary shards.
listener.onFailure(new NoNodeAvailableException("could not find any nodes to allocate index [" + indexName + | ||
"] onto prior to shrink")); | ||
} | ||
}, listener::onFailure)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This requires some consideration, for example, what happens if the nodes stats times out, should we fail open (still try to find a node), or fail closed?
I think we should at least fill out the failure handler so that if the nodes stats call fails, the message in the exception that ILM explain will show is more human readable. Something like "failed to retrieve disk information to select a single node for primary shard allocation".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense, I've done that.
@dakrone, sorry for the delay, I've pushed a new commit, can you help to take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for iterating on this @gaobinlong and apologies for the long delay in this PR.
I think this looks very good. I've left a few more suggestions (nothing major though).
...ck/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java
Outdated
Show resolved
Hide resolved
...ck/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java
Outdated
Show resolved
Hide resolved
if (indexMetadata.getNumberOfReplicas() != 0) { | ||
indexPrimaryShardsStorageBytes /= indexMetadata.getNumberOfReplicas(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calculation above is used to indicate "how much of the source index's storage" is available per node. As shrink works from both primary and replica I believe it's correct to include replicas in the math.
if (diskThresholdSettings.getFreeDiskThresholdLow() != 0) { | ||
freeBytesThresholdLow = (long) Math.ceil(nodeTotalBytes * | ||
diskThresholdSettings.getFreeDiskThresholdLow() * 0.01); | ||
} else { | ||
freeBytesThresholdLow = diskThresholdSettings.getFreeBytesThresholdLow().getBytes(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract this math into a method in DiskThresholdSettings
? ie. getLowWatermarkAsBytes
or something named similar? (with corresponding unit tests)
} | ||
|
||
if (validNodeIds.size() == 0) { | ||
logger.debug("no nodes have enough disk space to hold one copy of the index [{}] onto prior to shrink ", indexName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this error message is incorrect as we're including the size of the (future) shrunken index in the math as well? We should reflect that in the message, unless I'm misreading this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we only consider the size of the source index here.
...ck/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java
Outdated
Show resolved
Hide resolved
if (nodeId.isEmpty()) { | ||
List<String> list = new ArrayList<>(validNodeIds); | ||
Randomness.shuffle(list); | ||
nodeId = list.stream().findAny(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we shuffled above, shall we just get the first one? (to avoid extra allocations by streaming the list)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we should do that, I've changed the code yet.
Pinging @elastic/es-data-management (Team:Data Management) |
Relates to #67957.
The main changes of this PR are:
Optimize node selecting in SetSingleNodeAllocateStep of ILM's Shrink Action, the processes are here:
(1) Get node stats, only include fs info and index store stats
(2) Calculate each node's shard storage bytes of the source index
(3) Accumulate all nodes' shard storage bytes, to get the source index's primary shards storage bytes.
(4) The nodes which can be allocated two copies of the index's primary shards will be selected, that's because if the file system doesn’t support hard-linking, then all segments are copied into the new shrunken index, the disk must have free bytes below the low watermark to make sure the new shrunken index can be initialized successfully.
(5) Select the best node which contains the maximum shards storage bytes of the source index from the nodes list generated by step (4), because we want to reduce data transfer cost as much as possible.
(6) If we cannot find a node which contains any shard of the source index, then shuffle the valid node list and select a node randomly.
Add some test methods for the changes above.