-
Notifications
You must be signed in to change notification settings - Fork 36
Integration with Ultrawarm #95
Integration with Ultrawarm #95
Conversation
Ultrawarm introduces warm nodes into the ES cluster. Currently, we distribute model partitions to all data nodes in the cluster randomly, which could cause a model performance downgrade issue once warm nodes are throttled due to resource limitations. The PR excludes warm nodes to place model partitions. Since index shards are hosted on hot nodes, AD's coordinating nodes are in hot nodes as well. We don't need to send HourlyCron job and stats requests to warm nodes anymore. This PR implements those changes. Testing done: 1. Verified AD runs only in hot nodes. 2. stats API and HourlyCron still works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question for transparency. Why ultrawarm nodes are not eligible for anomaly detection?
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; | ||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; | ||
|
||
public class ClusterStateUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. Documentation is missing for public classes and methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
public class ClusterStateUtils { | ||
private static final Logger LOG = LogManager.getLogger(ClusterStateUtils.class); | ||
private final ClusterService clusterService; | ||
private final Map<String, String> ignoredAttributes = new HashMap<String, String>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. The code will be more robust if this state is injected at constructor rather than hardcoded so when the config changes, constructor and unit tests do not need to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final ClusterService clusterService; | ||
private final Map<String, String> ignoredAttributes = new HashMap<String, String>(); | ||
|
||
@Inject |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question. Is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes because transport action needs this class. Transport action constructor uses Guice to find injected dependencies. Dependency classes must have either one (and only one) constructor annotated with @Inject or a zero-argument constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I found it. It's the RestStatsAnomalyDetectorAction, right? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is StopDetectorTransportAction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do dependencies (to be injected) require @Inject annotation? It make senses for dependents to be annotated as entry point. For example, RCFResultTransportAction (a dependent) is annotated but its dependencies such as ADCircuitBreakerService and ModelManager are not.
ClusterStateUtils is a dependency not a dependent, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the injection because no implementation for java.util.HashMap was bound. ModelManager does not @Inject becasue all of its parameters are bound using AnomalyDetectorPlugin.createComponents. Now we don't need inject since I changed implementation. Please see the new PR.
ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE); | ||
} | ||
|
||
public ImmutableOpenMap<String, DiscoveryNode> getEligibleDataNodes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question. Why not using Map or unmodifiableMap so the specific class doesn't leak into client code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what clusterService.state().nodes().getDataNodes() returns. ImmutableOpenMap is not a Map or unmodifiableMap. ES defined it in their own way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the new code is a wrapper over the api and creates a new map, the wrapper may hide the implementation class, particular when map is much friendly to clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to return an array instead. Please see the context from Sorabh's comments.
|
||
for (Iterator<ObjectObjectCursor<String, DiscoveryNode>> it = dataNodes.iterator(); it.hasNext();) { | ||
ObjectObjectCursor<String, DiscoveryNode> cursor = it.next(); | ||
if (!isIgnoredNode(cursor.value)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. isEligibleNode is easier to use than double negative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one place where isIgnoredNode gets used. In some other places like ADClusterEventListener, we don't use double negative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i just did a count. there are three double negatives in this pr, two from ADClusterEventListener and one from ClusterStateUtils, while one without a second negation. This reduces readability unnecessarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to isEligibleNode with a different implementation. Please see it in another PR.
} | ||
for (Map.Entry<String, String> entry : ignoredAttributes.entrySet()) { | ||
String attribute = node.getAttributes().get(entry.getKey()); | ||
if (attribute != null && attribute.equals(entry.getValue())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. Or if (entry.getValue().equals(attribute)) {...}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Changed.
private Client client; | ||
|
||
public HourlyCron(ClusterService clusterService, Client client) { | ||
this.clusterService = clusterService; | ||
public HourlyCron(Client client, ClusterStateUtils clientStateUtils) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor. clusterStateUtils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
Warm nodes are performance sensitive. We don't want to mess up with them. |
String[] nodeIdsArr = nodesIdsStr.split(","); | ||
adStatsRequest = new ADStatsRequest(nodeIdsArr); | ||
} else { | ||
DiscoveryNode[] dataNodes = clusterStateUtils.getEligibleDataNodes().values().toArray(DiscoveryNode.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like everywhere the map returned by getEligibleDataNodes()
is converted to an array. Would be cleaner if we return an array from getEligibleDataNodes()
itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, changed to return an array.
|
||
@Inject | ||
public StopDetectorTransportAction( | ||
TransportService transportService, | ||
ClusterService clusterService, | ||
ClusterStateUtils clientStateUtils, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clientStateUtils -> clusterStateUtils ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changed to your DiscoveryNodeFilterer.
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; | ||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; | ||
|
||
public class ClusterStateUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since its a util class I would say you can just have a static methods in it and the caller doesn't have to create an instance of it. Something like this:
public final class DiscoveryNodeFilterer {
private DiscoveryNodeFilterer() {
}
public static DiscoveryNode[] getHotDataNodes(ClusterState state) {
final List<DiscoveryNode> eligibleNodes = new ArrayList<>();
final HotDataNodePredicate eligibleNodeFilter = new HotDataNodePredicate();
for(DiscoveryNode node : state.nodes()) {
if (eligibleNodeFilter.test(node)) {
eligibleNodes.add(node);
}
}
return eligibleNode.toArray(new DiscoveryNode[0]);
}
static class HotDataNodePredicate implements Predicate<DiscoveryNode> {
@Override
public boolean test(DiscoveryNode discoveryNode) {
return discoveryNode.isDataNode()
&& discoveryNode.getAttributes().getOrDefault(CommonName.BOX_TYPE_KEY, CommonName.HOT_BOX_TYPE).equals(CommonName.HOT_BOX_TYPE);
}
}
}
Or if you want to support multiple attributes then you can use DiscoveryNodeFilters
but the catch in this implementation is that it doesn't consider the DiscoveryNode
with null value for attribute as an eligible node. Whereas in the case here a hot data node can have null value for box_type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my implementation, I would ignore a node if its box type equals to warm. If we have null value for the box type attribute, we don't ignore them.
Will use your version since we don't have a use case to support multiple attributes now.
I am using an instance methods since that's easier for testing than static methods. And since we have dependence injection, we only have one copy of the class.
Ultrawarm introduces warm nodes into the ES cluster. Currently, we distribute model partitions to all data nodes in the cluster randomly, which could cause a model performance downgrade issue once warm nodes are throttled due to resource limitations. The PR excludes warm nodes to place model partitions. Since index shards are hosted on hot nodes, AD's coordinating nodes are in hot nodes as well. We don't need to send HourlyCron job and stats requests to warm nodes anymore. This PR implements those changes. Testing done: 1. Verified AD runs only in hot nodes. 2. stats API and HourlyCron still works.
* Integration with Ultrawarm (#95) Ultrawarm introduces warm nodes into the ES cluster. Currently, we distribute model partitions to all data nodes in the cluster randomly, which could cause a model performance downgrade issue once warm nodes are throttled due to resource limitations. The PR excludes warm nodes to place model partitions. Since index shards are hosted on hot nodes, AD's coordinating nodes are in hot nodes as well. We don't need to send HourlyCron job and stats requests to warm nodes anymore. This PR implements those changes. Testing done: 1. Verified AD runs only in hot nodes. 2. stats API and HourlyCron still works. * Integration with Ultrawarm - Follow up (#97) This is a follow up PR to address comments. Testing done: 1. gradle build passes 2. Verified AD runs only in hot nodes. 3. stats API and HourlyCron still works.
Issue #, if available:
Description of changes:
Ultrawarm introduces warm nodes into the ES cluster. Currently, we distribute model partitions to all data nodes in the cluster randomly, which could cause a model performance downgrade issue once warm nodes are throttled due to resource limitations. The PR excludes warm nodes to place model partitions.
Since index shards are hosted on hot nodes, AD's coordinating nodes are in hot nodes as well. We don't need to send HourlyCron job and stats requests to warm nodes anymore. This PR implements those changes.
Testing done:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.