Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Polarize actions based on impact vectors #332

Merged
merged 12 commits into from
Aug 31, 2020

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ActionListener;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.FlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.TimedFlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.google.common.annotations.VisibleForTesting;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* A grouping interface to provide different ways to group actions as needed by the {@link
* Collator}
*/
public interface ActionGrouper {

/**
* Groups the given list of actions by the nodes they impact..
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Extra full stop at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*
* @param actions The list of actions that need to be grouped.
* @return A map of actions grouped by nodes they impact.
*/
@NonNull Map<NodeKey, List<Action>> groupByNodeId(@NonNull final List<Action> actions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename the method name to include InstanceName ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, renamed it to groupByInstanceId.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Impact;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decision;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* Collator collects and prunes the candidate decisions from each decider so that their impacts are
* aligned
*
* <p>Decisions can increase or decrease pressure on different key resources on an Elasticearch
* node. This is encapsulated in each Action via the {@link ImpactVector}. Since each decider
* independently evaluates its decision, it is possible to have conflicting ImpactVectors from
* actions across deciders.
*
* <p>The collator prunes them to ensure we only take actions that either increase, or decrease
* pressure on a particular node. To resolve conflicts, we prefer stability over performance.
*/
public class Collator extends Decider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can only be one collator node in a graph right ? Can we add that to the comments ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


public static final String NAME = "collator";

/* Deciders can choose to publish decisions at different frequencies based on the
* type of resources monitored and rca signals. The collator should however, not introduce any
* unnecessary delays. As soon as a decision is taken, it should be evaluated and published downstream.
*/
private static final int collatorFrequency = 1; // Measured in terms of number of evaluationIntervalPeriods

private final List<Decider> deciders;

private final ActionGrouper actionGrouper;

public Collator(long evalIntervalSeconds, Decider... deciders) {
this(evalIntervalSeconds, new SingleNodeImpactActionGrouper(), deciders);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We plan to remove the evalIntervalSeconds as we wanted the scheduler to be simple - it would call each graphNode at every tick and then the node should decide how to handle if this is before the periodicity. In that vein, can we add a constant in Node class and use that and not even take evalIntervalSeconds as a constructor parameter ?

Copy link
Contributor Author

@ktkrg ktkrg Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed evalIntervalSeconds ctor parameter in Collator. It should be one less graph node to worry about when we remove the parameter permanently :)

}

public Collator(long evalIntervalSeconds, ActionGrouper actionGrouper, Decider... deciders) {
super(evalIntervalSeconds, collatorFrequency);
this.deciders = Arrays.asList(deciders);
this.actionGrouper = actionGrouper;
}

@Override
public String name() {
return NAME;
}

@Override
public Decision operate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a good idea to add a javadoc comment for the operate() as this is the center piece of the RCAGraph node ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

List<Action> proposedActions = getAllProposedActions();
Map<NodeKey, List<Action>> actionsByNode = actionGrouper.groupByNodeId(proposedActions);
List<Action> prunedActions = new ArrayList<>();
actionsByNode.forEach((k, v) -> prunedActions.addAll(polarize(k, v)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename the variables k, v here to be more expressive of the purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


Decision finalDecision = new Decision(System.currentTimeMillis(), NAME);
finalDecision.addAllActions(prunedActions);
return finalDecision;
}

@NonNull private List<Action> getAllProposedActions() {
final List<Action> proposedActions = new ArrayList<>();
if (deciders != null) {
for (final Decider decider : deciders) {
List<Decision> decisions = decider.getFlowUnits();
if (decisions != null) {
decisions.forEach(decision -> proposedActions.addAll(decision.getActions()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can decision.getActions() return null or empty value ? Do we want to filter for them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if any actions return null values, but added a null check any way.

}
}
}
return proposedActions;
}

private List<Action> polarize(final NodeKey nodeKey, List<Action> actions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can make actions final as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Took care of the other parameters as well.

List<Action> pressureIncreasingActions = new ArrayList<>();
List<Action> pressureNonIncreasingActions = new ArrayList<>();

for (final Action action : actions) {
ImpactVector impactVector = action.impact().getOrDefault(nodeKey, new ImpactVector());

// Classify the action as pressure increasing action if the impact for any dimension is
// increasing pressure.
if (impactVector.getImpact()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This classifies an increase across any dimension as increasing pressure. Shouldn't we check on a per dimension basis? Otherwise there is no value in having different dimensions in the impact vector.

Suppose you have an action to offload some data from heap to a file on disk. This would reduce heap but increase disk pressure. If there is no other action trying to decrease disk pressure, then the overall disk on the node is not under contention and we should let this action go through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I assumed for the first iteration we would not have such actions and had not thought of polarizing at a dimension level. I will change it.

.values()
.stream()
.anyMatch(impact -> impact == Impact.INCREASES_PRESSURE)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional programming !! yay !

pressureIncreasingActions.add(action);
} else {
pressureNonIncreasingActions.add(action);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given "If there are any actions that decrease pressure for a node, prefer that over list of actions that increase pressure.", should we break here when we find a pressureNonIncreasingActions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might be other pressure decreasing actions beyond the one that increases it, we need to add that as well, so we can't break early.

}
}

// If there are any actions that decrease pressure for a node, prefer that over list of
// actions that increase pressure.
if (pressureNonIncreasingActions.size() > 0) {
return pressureNonIncreasingActions;
}

// Return list of actions that increase pressure only if no decider has proposed an action
// that will relieve pressure for this node.
return pressureIncreasingActions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.NonNull;

public class SingleNodeImpactActionGrouper implements ActionGrouper {

@Override
@NonNull public Map<NodeKey, List<Action>> groupByNodeId(@NonNull List<Action> actions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we want the collator to come up with one action per node or one action per resource per node ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collator does not filter beyond polarizing the actions. It will return as many as possible after making sure that they align in terms of the impact on the instance so that the publisher and other components can filter based on other parameters.

final Map<NodeKey, List<Action>> actionsByNodeId = new HashMap<>();
actions.stream()
.filter(action -> action.impactedNodes().size() == 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, each action can be associated with 1 or 0 impacted Nodes ? Just wanted to clarify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, is that not the understanding today? This class only considers those actions that impact exactly one node. The multi node impact action grouping will be done in the next PR.

@yojs brought up a similar point, do we create a single action with multiple nodes in the impacted nodes when we want to say decrease cache size for node1, node2, node3 or do we create multiple actions for each node we want to decrease the cache size for?

Copy link
Contributor

@khushbr khushbr Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clarified this with others and we will have actions which impact a single node and then another set of action which are for multiple nodes.

Taking the above example, cache increase action will be per node but for cache decrease, we will have 1 action with multiple nodes in the impactedNodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actions can impact a single node or multiple nodes - this depends on the type of action. For a given action type (a class implementing Action interface), this stays fixed.

@khushbr's example above is slightly incorrect. ModifyCacheAction impacts only a single node. If we want to increase or decrease caches across multiple nodes, the decider will have to create multiple such actions.

There will be other types of actions in future, like MoveShard, which can impact multiple nodes (src node and dest node) or SplitShard which impacts even more nodes. For those actions, impactedNodes will contain all the nodes impacted and impact() will return the impactVector for each node - e.g. for MoveShard impact() will indicate that pressure gets reduced on src node and increased on the destination node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment why are we filtering out the actions that ask for the same action to be performed on multiple nodes ? Is that rationale that we do it only one node per iteration ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting point, don't we create multiple actions in such cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are creating one action per node even if its the same action for multiple nodes, added a comment.

.forEach(action -> actionsByNodeId.computeIfAbsent(action.impactedNodes()
.get(0), k -> new ArrayList<>())
.add(action));
return actionsByNodeId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to turn on some checkstyle rukes, this should have caused an error ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the .get(0) is aligned with .impactedNodes() because they're chained calls, and similarly, .add(action) is aligned with .computeIfAbsent() because they're chained calls.

This is what the formatter did and I thought it was helpful 😅 I'm using the GoogleStyle scheme. Let me know if I should use another formatter.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_LOCUS;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.CacheHealthDecider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Publisher;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.QueueHealthDecider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardStatsDerivedDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
Expand Down Expand Up @@ -103,7 +103,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ActionListener;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins.Plugin;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.google.common.collect.Lists;

import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;

import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
Expand Down
Loading