-
Notifications
You must be signed in to change notification settings - Fork 20
Polarize actions based on impact vectors #332
Conversation
* <p>The collator prunes them to ensure we only take actions that either increase, or decrease | ||
* pressure on a particular node. To resolve conflicts, we prefer stability over performance. | ||
*/ | ||
public class Collator extends Decider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can only be one collator node in a graph right ? Can we add that to the comments ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return proposedActions; | ||
} | ||
|
||
private List<Action> polarize(final NodeKey nodeKey, List<Action> actions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can make actions final as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. Took care of the other parameters as well.
if (impactVector.getImpact() | ||
.values() | ||
.stream() | ||
.anyMatch(impact -> impact == Impact.INCREASES_PRESSURE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Functional programming !! yay !
.get(0), k -> new ArrayList<>()) | ||
.add(action)); | ||
return actionsByNodeId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to turn on some checkstyle rukes, this should have caused an error ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the .get(0) is aligned with .impactedNodes() because they're chained calls, and similarly, .add(action) is aligned with .computeIfAbsent() because they're chained calls.
This is what the formatter did and I thought it was helpful 😅 I'm using the GoogleStyle scheme. Let me know if I should use another formatter.
assertEquals(1, groupedActions.get(nodeKey1).size()); | ||
assertEquals(singleNodeImpactAction1, groupedActions.get(nodeKey1).get(0)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new line in the end ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
Can you add a label for the PR ? |
In the PR description
Unfortunately, |
public interface ActionGrouper { | ||
|
||
/** | ||
* Groups the given list of actions by the nodes they impact.. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Extra full stop at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
for (final Decider decider : deciders) { | ||
List<Decision> decisions = decider.getFlowUnits(); | ||
if (decisions != null) { | ||
decisions.forEach(decision -> proposedActions.addAll(decision.getActions())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can decision.getActions()
return null or empty value ? Do we want to filter for them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if any actions return null
values, but added a null check any way.
List<Action> proposedActions = getAllProposedActions(); | ||
Map<NodeKey, List<Action>> actionsByNode = actionGrouper.groupByNodeId(proposedActions); | ||
List<Action> prunedActions = new ArrayList<>(); | ||
actionsByNode.forEach((k, v) -> prunedActions.addAll(polarize(k, v))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rename the variables k, v
here to be more expressive of the purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
.anyMatch(impact -> impact == Impact.INCREASES_PRESSURE)) { | ||
pressureIncreasingActions.add(action); | ||
} else { | ||
pressureNonIncreasingActions.add(action); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given "If there are any actions that decrease pressure for a node, prefer that over list of actions that increase pressure.", should we break here when we find a pressureNonIncreasingActions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there might be other pressure decreasing actions beyond the one that increases it, we need to add that as well, so we can't break early.
@NonNull public Map<NodeKey, List<Action>> groupByNodeId(@NonNull List<Action> actions) { | ||
final Map<NodeKey, List<Action>> actionsByNodeId = new HashMap<>(); | ||
actions.stream() | ||
.filter(action -> action.impactedNodes().size() == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, each action can be associated with 1 or 0 impacted Nodes ? Just wanted to clarify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, is that not the understanding today? This class only considers those actions that impact exactly one node. The multi node impact action grouping will be done in the next PR.
@yojs brought up a similar point, do we create a single action with multiple nodes in the impacted nodes when we want to say decrease cache size for node1, node2, node3 or do we create multiple actions for each node we want to decrease the cache size for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I clarified this with others and we will have actions which impact a single node and then another set of action which are for multiple nodes.
Taking the above example, cache increase action will be per node but for cache decrease, we will have 1 action with multiple nodes in the impactedNodes
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actions can impact a single node or multiple nodes - this depends on the type of action. For a given action type (a class implementing Action interface), this stays fixed.
@khushbr's example above is slightly incorrect. ModifyCacheAction
impacts only a single node. If we want to increase or decrease caches across multiple nodes, the decider will have to create multiple such actions.
There will be other types of actions in future, like MoveShard
, which can impact multiple nodes (src node and dest node) or SplitShard
which impacts even more nodes. For those actions, impactedNodes
will contain all the nodes impacted and impact()
will return the impactVector
for each node - e.g. for MoveShard
impact() will indicate that pressure gets reduced on src node and increased on the destination node.
* @param actions The list of actions that need to be grouped. | ||
* @return A map of actions grouped by nodes they impact. | ||
*/ | ||
@NonNull Map<NodeKey, List<Action>> groupByNodeId(@NonNull final List<Action> actions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename the method name to include InstanceName
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, renamed it to groupByInstanceId
.
private final ActionGrouper actionGrouper; | ||
|
||
public Collator(long evalIntervalSeconds, Decider... deciders) { | ||
this(evalIntervalSeconds, new SingleNodeImpactActionGrouper(), deciders); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We plan to remove the evalIntervalSeconds
as we wanted the scheduler to be simple - it would call each graphNode at every tick and then the node should decide how to handle if this is before the periodicity. In that vein, can we add a constant in Node
class and use that and not even take evalIntervalSeconds
as a constructor parameter ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed evalIntervalSeconds
ctor parameter in Collator. It should be one less graph node to worry about when we remove the parameter permanently :)
} | ||
|
||
@Override | ||
public Decision operate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be a good idea to add a javadoc comment for the operate()
as this is the center piece of the RCAGraph node ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
public class SingleNodeImpactActionGrouper implements ActionGrouper { | ||
|
||
@Override | ||
@NonNull public Map<NodeKey, List<Action>> groupByNodeId(@NonNull List<Action> actions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we want the collator to come up with one action per node or one action per resource per node ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collator does not filter beyond polarizing the actions. It will return as many as possible after making sure that they align in terms of the impact on the instance so that the publisher and other components can filter based on other parameters.
@NonNull public Map<NodeKey, List<Action>> groupByNodeId(@NonNull List<Action> actions) { | ||
final Map<NodeKey, List<Action>> actionsByNodeId = new HashMap<>(); | ||
actions.stream() | ||
.filter(action -> action.impactedNodes().size() == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment why are we filtering out the actions that ask for the same action to be performed on multiple nodes ? Is that rationale that we do it only one node per iteration ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting point, don't we create multiple actions in such cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are creating one action per node even if its the same action for multiple nodes, added a comment.
Codecov Report
@@ Coverage Diff @@
## master #332 +/- ##
============================================
+ Coverage 69.92% 70.11% +0.19%
- Complexity 2242 2282 +40
============================================
Files 301 303 +2
Lines 13481 13605 +124
Branches 1116 1133 +17
============================================
+ Hits 9426 9539 +113
- Misses 3690 3691 +1
- Partials 365 375 +10
Continue to review full report at Codecov.
|
List<Decision> decisions = decider.getFlowUnits(); | ||
if (decisions != null) { | ||
decisions.forEach(decision -> { | ||
if (decision.getActions() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use decision.isEmpty()
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change in the next rev
@NonNull public Map<NodeKey, List<Action>> groupByNodeId(@NonNull List<Action> actions) { | ||
final Map<NodeKey, List<Action>> actionsByNodeId = new HashMap<>(); | ||
actions.stream() | ||
.filter(action -> action.impactedNodes().size() == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actions can impact a single node or multiple nodes - this depends on the type of action. For a given action type (a class implementing Action interface), this stays fixed.
@khushbr's example above is slightly incorrect. ModifyCacheAction
impacts only a single node. If we want to increase or decrease caches across multiple nodes, the decider will have to create multiple such actions.
There will be other types of actions in future, like MoveShard
, which can impact multiple nodes (src node and dest node) or SplitShard
which impacts even more nodes. For those actions, impactedNodes
will contain all the nodes impacted and impact()
will return the impactVector
for each node - e.g. for MoveShard
impact() will indicate that pressure gets reduced on src node and increased on the destination node.
// Classify the action as pressure increasing action if the impact for any dimension is | ||
// increasing pressure. | ||
if (impactVector.getImpact() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This classifies an increase across any dimension as increasing pressure. Shouldn't we check on a per dimension basis? Otherwise there is no value in having different dimensions in the impact vector.
Suppose you have an action to offload some data from heap to a file on disk. This would reduce heap but increase disk pressure. If there is no other action trying to decrease disk pressure, then the overall disk on the node is not under contention and we should let this action go through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I assumed for the first iteration we would not have such actions and had not thought of polarizing at a dimension level. I will change it.
return proposedActions; | ||
} | ||
|
||
private List<Action> polarize(final NodeKey nodeKey, final List<Action> actions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that this will need non-trivial changes when we have actions impacting multiple nodes. If an action is purged because of pressure conflict on one of the nodes, it must be removed from the list of all of the nodes.
Should polarize()
then move to ActionGrouper
? Is the overall above structure right for future use cases?
My ideas for solving this were more around compiling all impact vectors into a matrix and doing a second pass through it to accept or prune actions. Something on these lines:
Suppose the collator received 2 actions, moveShard from node A to node B, and moveShard from node B to node C
actions = [MoveShard: A -> B, MoveShard: B -> C]
Impact vectors:
actions[0] :: MoveShard: A -> B
impactedNodes = [A, B]
A: [CPU:dec, Heap: dec, RAM: dec, NW: dec, Disk: dec]
B: [CPU:inc, Heap: inc, RAM: inc, NW: inc, Disk: inc]
actions[1] :: MoveShard: B -> C
impactedNodes = [B, C]
B: [CPU:dec, Heap: dec, RAM: dec, NW: dec, Disk: dec]
C: [CPU:inc, Heap: inc, RAM: inc, NW: inc, Disk: inc]
First we create an impact matrix for the cluster:
- node A has only decrease across each dim
- B has 1 inc and 1 dec action,
- C has 1 inc action for each dim
A B C
cpu i=0,d=1 i=1,d=1 i=1,d=0
heap i=0,d=1 i=1,d=1 i=1,d=0
ram i=0,d=1 i=1,d=1 i=1,d=0
nw i=0,d=1 i=1,d=1 i=1,d=0
disk i=0,d=1 i=1,d=1 i=1,d=0
Now we run the actions through this matrix again.
If an action is increasing pressure on a dim on a node, it is only allowed
if there is no other action trying to dec that dim on the node. i.e. d=0 for that node,dim.
When an action gets purged in this second pass, we remove it from the matrix by reducing the node,dim counters.
In above e.g. we process MoveShard(A->B)
-
- It cannot go through because
d=1
for some dims on node B - We purge
MoveShard(A->B)
- Counters on A and B are updated. All counters in A become
i=0, d=0
. B becomesi=0,d=1
Now we process MoveShard(B->C)
-
- It increases pressure on C but there is no one decreasing pressure (
d=0
for all dims on C), so it can go through - On B it is dec pressure anyway so it has a clear pass.
Hence MoveShard(B->C)
gets picked.
This will also resolve cycles like A->B, B->C, C->A by picking either one or two of the actions (depending on order of 2nd pass).
It is okay if we handle multi-node actions in a separate PR later. You may also use different/better algorithms for polarizing impact vectors. Just wanted to check if the current Collator::polarize()
and ActionGrouper::groupByInstanceId()
is the right approach for future use cases, like the moveShard, splitShard etc..
It will also be good to at least handle the dimension level comparisons right away, as they help establish a structure for all use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this sage advice, I had an unnecessarily complex system and I was able to rip it out until I got to something similar to what you've proposed here :) I've updated the PR to reflect these changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good @ktkrg, clean and well documented. Thanks for making these changes.
* to most. | ||
*/ | ||
@VisibleForTesting | ||
static final class ImpactBasedActionComparator implements Comparator<Action>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting implementation.
@NonNull final ImpactVector impactVector) { | ||
boolean isAligned = true; | ||
|
||
// If this is an action that increases pressure along some dimension for this node, and the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a useful comment, thanks for adding
*/ | ||
public boolean checkAlignmentAcrossDimensions(@NonNull final String actionName, | ||
@NonNull final ImpactVector impactVector) { | ||
boolean isAligned = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move to after the comment string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
final Map<Dimension, Impact> impactMap = impactVector.getImpact(); | ||
for (final Map.Entry<Dimension, Impact> entry : impactMap.entrySet()) { | ||
final Impact impactOnDimension = entry.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use impactMap.foreach(dimension, impact) {...}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't use forEach
here because we will be updating a local variable inside the body of the lambda and it needs the isAligned
value to be effectively final. Since local variables are not effectively final, we'll need to declare an array or a list of size=1 to hold the isAligned
value.
I chose the verbose way to avoid some confusion there.
List<String> pressureDecreasingActions = perDimensionPressureDecreasingActions | ||
.getOrDefault(entry.getKey(), Collections.emptyList()); | ||
isAligned = pressureDecreasingActions.isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
isAligned = !pressureDecreasingActions.contains(dimension);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
map.computeIfAbsent(dimension, | ||
dim -> new ArrayList<>()).add(actionName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: single line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
return impactVector; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Do you think we can add integ tests for this ? |
Issue #:
Fixes #331
Description of changes:
This change adds the ability for the collator to group decider actions by nodes and polarize the actions based on their impact vectors.
The collator looks at various actions suggested by the deciders and first groups them by the node those actions impact.
For each node, if there is an action suggested that would go in the direction of decreasing pressure, then only choose(a la polarize) pressure decreasing actions. Only if there is no pressure decreasing action suggested for a node, then choose actions that increase pressure.
Tests:
Unit tests and tested on docker.
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.