diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Collator.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Collator.java
deleted file mode 100644
index 803ae958a..000000000
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Collator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;
-
-import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Collator collects and prunes the candidate decisions from each decider so that their impacts are
- * aligned
- *
- *
Decisions can increase or decrease pressure on different key resources on an Elasticearch
- * node. This is encapsulated in each Action via the {@link ImpactVector}. Since each decider
- * independently evaluates its decision, it is possible to have conflicting ImpactVectors from
- * actions across deciders.
- *
- *
The collator prunes them to ensure we only take actions that either increase, or decrease
- * pressure on a particular node. To resolve conflicts, we prefer stability over performance.
- */
-public class Collator extends Decider {
-
- public static final String NAME = "collator";
-
- /* Deciders can choose to publish decisions at different frequencies based on the
- * type of resources monitored and rca signals. The collator should however, not introduce any
- * unnecessary delays. As soon as a decision is taken, it should be evaluated and published downstream.
- */
- private static final int collatorFrequency = 1; // Measured in terms of number of evaluationIntervalPeriods
-
- private List deciders;
-
- public Collator(long evalIntervalSeconds, Decider... deciders) {
- super(evalIntervalSeconds, collatorFrequency);
- this.deciders = Arrays.asList(deciders);
- }
-
- @Override
- public String name() {
- return NAME;
- }
-
- @Override
- public Decision operate() {
- // This is a simple pass-through collator implementation
- // TODO: Prune actions by their ImpactVectors
-
- Decision finalDecision = new Decision(System.currentTimeMillis(), NAME);
- for (Decider decider : deciders) {
- Decision decision = decider.getFlowUnits().get(0);
- finalDecision.addAllActions(decision.getActions());
- }
- return finalDecision;
- }
-}
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Publisher.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Publisher.java
index a5e9228f2..4c9e20ea8 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Publisher.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Publisher.java
@@ -21,6 +21,7 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.CoolOffDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.FlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.TimedFlipFlopDetector;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/Collator.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/Collator.java
new file mode 100644
index 000000000..c437fb934
--- /dev/null
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/Collator.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;
+
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Impact;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decider;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decision;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.AnalysisGraph;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Collator collects and prunes the candidate decisions from each decider so that their impacts are
+ * aligned.
+ *
+ * Decisions can increase or decrease pressure on different key resources on an Elasticsearch
+ * node. This is encapsulated in each Action via the {@link ImpactVector}. Since each decider
+ * independently evaluates its decision, it is possible to have conflicting ImpactVectors from
+ * actions across deciders.
+ *
+ *
The collator prunes them to ensure we only take actions that either increase, or decrease
+ * pressure on a particular node's resources. To resolve conflicts, we prefer stability over
+ * performance. In order for the above guarantee to work, there should be only one collator instance
+ * in an {@link AnalysisGraph}.
+ */
+public class Collator extends Decider {
+
+ public static final String NAME = "collator";
+
+ /* Deciders can choose to publish decisions at different frequencies based on the
+ * type of resources monitored and rca signals. The collator should however, not introduce any
+ * unnecessary delays. As soon as a decision is taken, it should be evaluated and published downstream.
+ */
+ private static final int collatorFrequency = 1; // Measured in terms of number of evaluationIntervalPeriods
+
+ private static final int evalIntervalSeconds = 5;
+
+ private final ImpactAssessor impactAssessor;
+
+ private final List deciders;
+
+ private final Comparator actionComparator;
+
+ public Collator(Decider... deciders) {
+ super(evalIntervalSeconds, collatorFrequency);
+ this.deciders = Arrays.asList(deciders);
+ this.actionComparator = new ImpactBasedActionComparator();
+ this.impactAssessor = new ImpactAssessor();
+ }
+
+ /**
+ * Constructor used for unit testing purposes only.
+ *
+ * @param impactAssessor the impact assessor.
+ * @param actionComparator comparator for sorting actions.
+ * @param deciders The participating deciders.
+ */
+ @VisibleForTesting
+ public Collator(final ImpactAssessor impactAssessor, final Comparator actionComparator,
+ Decider... deciders) {
+ super(evalIntervalSeconds, collatorFrequency);
+ this.deciders = Arrays.asList(deciders);
+ this.actionComparator = actionComparator;
+ this.impactAssessor = impactAssessor;
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ /**
+ * Process all the actions proposed by the deciders and prune them based on their impact vectors.
+ *
+ * @return A {@link Decision} instance that contains the list of polarized actions.
+ */
+ @Override
+ public Decision operate() {
+ Decision finalDecision = new Decision(System.currentTimeMillis(), NAME);
+ List allowedActions = new ArrayList<>();
+
+ // First get all the actions proposed by the deciders and assess the overall impact all
+ // actions combined have on all the affected nodes.
+
+ List allActions = getProposedActions();
+ Map overallImpactAssessment =
+ impactAssessor.assessOverallImpact(allActions);
+
+ // We need to identify and prune conflicting actions based on the overall impact. In order to
+ // do that, we re-assess each of the proposed actions with the overall impact assessment in
+ // mind. In each such assessment, we ensure the impact of an action aligns with the instance's
+ // current pressure heading(increasing/decreasing). Actions that don't align are pruned and
+ // their effects on the overall impact are undone. As the order in which we reassess matters
+ // as to what actions get picked and what don't, we sort the list of actions based on a
+ // simple heuristic where actions that reduce pressure the most are re-assessed later
+ // thereby decreasing the chance of them getting pruned because of another action.
+
+ allActions.sort(actionComparator);
+ allActions.forEach(action -> {
+ if (impactAssessor.isImpactAligned(action, overallImpactAssessment)) {
+ allowedActions.add(action);
+ } else {
+ impactAssessor.undoActionImpactOnOverallAssessment(action, overallImpactAssessment);
+ }
+ });
+
+ finalDecision.addAllActions(allowedActions);
+ return finalDecision;
+ }
+
+ /**
+ * Combines all actions proposed by the deciders into a list.
+ *
+ * @return A list of actions.
+ */
+ @NonNull
+ private List getProposedActions() {
+ final List proposedActions = new ArrayList<>();
+ if (deciders != null) {
+ for (final Decider decider : deciders) {
+ List decisions = decider.getFlowUnits();
+ decisions.forEach(decision -> {
+ if (!decision.getActions().isEmpty()) {
+ proposedActions.addAll(decision.getActions());
+ }
+ });
+ }
+ }
+ return proposedActions;
+ }
+
+ /**
+ * A comparator for actions to sort them based on their impact from least pressure decreasing
+ * to most.
+ */
+ @VisibleForTesting
+ static final class ImpactBasedActionComparator implements Comparator, Serializable {
+
+ @Override
+ public int compare(Action action1, Action action2) {
+ int numberOfPressureReductions1 = getImpactedDimensionCount(action1, Impact.DECREASES_PRESSURE);
+ int numberOfPressureReductions2 = getImpactedDimensionCount(action2, Impact.DECREASES_PRESSURE);
+
+ if (numberOfPressureReductions1 != numberOfPressureReductions2) {
+ return numberOfPressureReductions1 - numberOfPressureReductions2;
+ }
+
+ int numberOfPressureIncreases1 = getImpactedDimensionCount(action1, Impact.INCREASES_PRESSURE);
+ int numberOfPressureIncreases2 = getImpactedDimensionCount(action2, Impact.INCREASES_PRESSURE);
+
+ if (numberOfPressureIncreases1 != numberOfPressureIncreases2) {
+ return numberOfPressureIncreases2 - numberOfPressureIncreases1;
+ }
+
+ return 0;
+ }
+
+ private int getImpactedDimensionCount(final Action action, Impact requiredImpact) {
+ int count = 0;
+ for (ImpactVector impactVector : action.impact().values()) {
+ for (Impact impact : impactVector.getImpact().values()) {
+ if (impact.equals(requiredImpact)) {
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessment.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessment.java
new file mode 100644
index 000000000..dc4adbfda
--- /dev/null
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessment.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;
+
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Impact;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * ImpactAssessment maintains and helps with updating impacts of actions on a node.
+ */
+public class ImpactAssessment {
+
+ private static final Logger LOG = LogManager.getLogger(ImpactAssessment.class);
+ private final NodeKey nodeKey;
+
+ private final Map> perDimensionPressureDecreasingActions;
+ private final Map> perDimensionPressureIncreasingActions;
+
+ public ImpactAssessment(final NodeKey nodeKey) {
+ this.nodeKey = nodeKey;
+ this.perDimensionPressureDecreasingActions = new HashMap<>();
+ this.perDimensionPressureIncreasingActions = new HashMap<>();
+ }
+
+ /**
+ * Adds the action's impact to the current overall impact of all proposed actions for this node
+ * so far.
+ * @param actionName The name of the action.
+ * @param impactVector The impact vector which gives a pressure heading for various resources
+ * impacted by taking this action.
+ */
+ public void addActionImpact(@NonNull final String actionName,
+ @NonNull final ImpactVector impactVector) {
+ final Map impactMap = impactVector.getImpact();
+
+ impactMap.forEach((dimension, impact) -> {
+ switch (impact) {
+ case INCREASES_PRESSURE:
+ addActionToMap(perDimensionPressureIncreasingActions, actionName, dimension);
+ break;
+ case DECREASES_PRESSURE:
+ addActionToMap(perDimensionPressureDecreasingActions, actionName, dimension);
+ break;
+ case NO_IMPACT:
+ break;
+ default:
+ LOG.warn("Unknown impact value: {} encountered while adding action: {}'s impact",
+ impact, actionName);
+ }
+ });
+ }
+
+ /**
+ * Removes an action's impact from the current overall impact of all proposed actions for this
+ * node so far.
+ * @param actionName The name of the action.
+ * @param impactVector The impact vector which gives a pressure heading for various resources
+ * impacted by taking this action.
+ */
+ public void removeActionImpact(@NonNull final String actionName,
+ @NonNull ImpactVector impactVector) {
+ final Map impactMap = impactVector.getImpact();
+
+ impactMap.forEach((dimension, impact) -> {
+ switch (impact) {
+ case INCREASES_PRESSURE:
+ removeActionFromMap(perDimensionPressureIncreasingActions, actionName, dimension);
+ break;
+ case DECREASES_PRESSURE:
+ removeActionFromMap(perDimensionPressureDecreasingActions, actionName, dimension);
+ break;
+ case NO_IMPACT:
+ break;
+ default:
+ LOG.warn("Unknown impact value: {} encountered while removing action: {}'s impact",
+ impact, actionName);
+ }
+ });
+ }
+
+ /**
+ * Checks if the given impact vector aligns with the current overall impact for this node.
+ * Alignment is checked when reassessing the actions where all impacts are replayed.
+ *
+ * @param actionName The name of the action.
+ * @param impactVector The impact vector which gives a pressure heading for various resources
+ * impacted by taking this action.
+ * @return true if the impact vector aligns with the overall impact, false otherwise.
+ */
+ public boolean checkAlignmentAcrossDimensions(@NonNull final String actionName,
+ @NonNull final ImpactVector impactVector) {
+ // If this is an action that increases pressure along some dimension for this node, and the
+ // overall assessment says there are actions that decrease pressure along those same
+ // dimensions, then this action is not aligned with the other proposed actions where the
+ // deciders are trying to reduce pressure for those dimensions.
+ boolean isAligned = true;
+
+ final Map impactMap = impactVector.getImpact();
+ for (final Map.Entry entry : impactMap.entrySet()) {
+ final Impact impactOnDimension = entry.getValue();
+ if (isAligned && impactOnDimension.equals(Impact.INCREASES_PRESSURE)) {
+ isAligned = !perDimensionPressureDecreasingActions.containsKey(entry.getKey());
+
+ if (!isAligned) {
+ LOG.info("action: {}'s impact is not aligned with node: {}'s overall impact for "
+ + "dimension: {}. Found pressure decreasing actions: {}", actionName, nodeKey,
+ entry.getKey(), perDimensionPressureDecreasingActions.getOrDefault(entry.getKey(),
+ Collections.emptyList()));
+ }
+ }
+ }
+
+ return isAligned;
+ }
+
+ private void addActionToMap(@NonNull final Map> map,
+ @NonNull final String actionName, @NonNull final Dimension dimension) {
+ map.computeIfAbsent(dimension, dim -> new ArrayList<>()).add(actionName);
+ }
+
+ private void removeActionFromMap(@NonNull final Map> map,
+ @NonNull final String actionName, @NonNull final Dimension dimension) {
+ final List actions = map.get(dimension);
+ if (actions != null) {
+ actions.remove(actionName);
+ if (actions.isEmpty()) {
+ map.remove(dimension);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessor.java
new file mode 100644
index 000000000..e0aa74aa0
--- /dev/null
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessor.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;
+
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * ImpactAssessor is responsible for assessing the impact of various actions on the nodes and
+ * determining if an action is currently aligned in the same direction as the node's current
+ * pressure heading in the presence of all proposed actions.
+ */
+public class ImpactAssessor {
+
+ private static final Logger LOG = LogManager.getLogger(ImpactAssessor.class);
+
+ /**
+ * Combines the pressure characteristics of the given list of actions into an overall impact
+ * assessment per node.
+ *
+ * @param actions The list of actions whose for which the impact need to assessed.
+ * @return A map of instance to its overall impact on the instance based on the provided list of
+ * actions.
+ */
+ public @NonNull Map assessOverallImpact(
+ @NonNull final List actions) {
+ Map overallImpactAssessment = new HashMap<>();
+ actions.forEach(action -> {
+ Map impactMap = action.impact();
+ impactMap.forEach((nodeKey, impactVector) -> overallImpactAssessment.computeIfAbsent(nodeKey,
+ ImpactAssessment::new).addActionImpact(action.name(), impactVector));
+ });
+
+ return overallImpactAssessment;
+ }
+
+ /**
+ * Checks if the impact of a given action aligns with the overall proposed impact for a node. An
+ * action is classified as 'impact aligning' only if all the impacted nodes in the action align
+ * with their proposed pressure heading.
+ *
+ * @param action the action whose impact needs to be checked for alignment.
+ * @param overallImpactAssessment The impact assessment that provides the pressure heading for the
+ * nodes.
+ * @return true if all impacted nodes are in alignment.
+ */
+ public boolean isImpactAligned(@NonNull final Action action,
+ @NonNull final Map overallImpactAssessment) {
+
+ boolean isAligned = true;
+
+ for (final NodeKey nodeKey : action.impactedNodes()) {
+ if (!overallImpactAssessment.containsKey(nodeKey)) {
+ LOG.error("Overall impact assessment does not a node key: {} for which an impacting action "
+ + "exists.", nodeKey);
+ return false;
+ }
+
+ final ImpactAssessment nodeImpactAssessment = overallImpactAssessment.get(nodeKey);
+
+ isAligned = isAligned && nodeImpactAssessment.checkAlignmentAcrossDimensions(action.name(),
+ action.impact().get(nodeKey));
+ }
+
+ return isAligned;
+ }
+
+ public void undoActionImpactOnOverallAssessment(@NonNull final Action action,
+ @NonNull final Map overallImpactAssessment) {
+ for (final NodeKey nodeKey : action.impactedNodes()) {
+ if (!overallImpactAssessment.containsKey(nodeKey)) {
+ LOG.error("Overall impact assessment does not a node key: {} for which an impacting action "
+ + "exists.", nodeKey);
+ return;
+ }
+
+ final ImpactAssessment nodeImpactAssessment = overallImpactAssessment.get(nodeKey);
+ nodeImpactAssessment.removeActionImpact(action.name(), action.impact().get(nodeKey));
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/Node.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/Node.java
index 29e18cbfc..d547eff93 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/Node.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/Node.java
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.checkerframework.checker.nullness.qual.NonNull;
public abstract class Node {
// TODO: Make bounds explicit
@@ -168,6 +169,7 @@ public String toString() {
return name();
}
+ @NonNull
public List getFlowUnits() {
List allFlowUnits = flowUnits == null ? new ArrayList<>() : new ArrayList<>(flowUnits);
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/ElasticSearchAnalysisGraph.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/ElasticSearchAnalysisGraph.java
index 77fb222e9..30317ff27 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/ElasticSearchAnalysisGraph.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/ElasticSearchAnalysisGraph.java
@@ -22,9 +22,9 @@
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_LOCUS;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.CacheHealthDecider;
-import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Publisher;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.QueueHealthDecider;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardStatsDerivedDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
@@ -103,7 +103,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -274,7 +273,7 @@ public void construct() {
//constructResourceHeatMapGraph();
// Collator - Collects actions from all deciders and aligns impact vectors
- Collator collator = new Collator(EVALUATION_INTERVAL_SECONDS, queueHealthDecider, cacheHealthDecider);
+ Collator collator = new Collator(queueHealthDecider, cacheHealthDecider);
collator.addTag(TAG_LOCUS, LOCUS_MASTER_NODE);
collator.addAllUpstreams(Arrays.asList(queueHealthDecider, cacheHealthDecider));
diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/PublisherTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/PublisherTest.java
index 58433fdf0..e1058b920 100644
--- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/PublisherTest.java
+++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/PublisherTest.java
@@ -22,28 +22,24 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.CoolOffDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins.Plugin;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails.Id;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails.Ip;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.google.common.collect.Lists;
-
import java.time.Clock;
import java.time.Duration;
-import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
-
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/CollatorTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/CollatorTest.java
new file mode 100644
index 000000000..d1f647acb
--- /dev/null
+++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/CollatorTest.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decider;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decision;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator.ImpactBasedActionComparator;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails.Id;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails.Ip;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class CollatorTest {
+
+ private Collator testCollator;
+
+ @Mock
+ private Decider mockDecider1;
+
+ @Mock
+ private Decider mockDecider2;
+
+ @Mock
+ private Comparator mockComparator;
+
+ @Mock
+ private Decision decision1;
+
+ @Mock
+ private Decision decision2;
+
+ @Mock
+ private Action moveShardAction1;
+
+ @Mock
+ private Action moveShardAction2;
+
+ @Mock
+ private Action moveShardAction3;
+
+ private final ImpactAssessor impactAssessor = new ImpactAssessor();
+
+ private final NodeKey nodeA = new NodeKey(new Id("node A"), new Ip("1.2.3.4"));
+ private final NodeKey nodeB = new NodeKey(new Id("node B"), new Ip("5.6.7.8"));
+ private final NodeKey nodeC = new NodeKey(new Id("node C"), new Ip("9.10.11.12"));
+
+ private String moveShardActionName = "MoveShard";
+
+ private Map moveShardImpact1 = ImmutableMap.of(
+ nodeA, buildShardMoveOutImpactVector(),
+ nodeB, buildShardMoveInImpactVector()
+ );
+
+ private Map moveShardImpact2 = ImmutableMap.of(
+ nodeB, buildShardMoveOutImpactVector(),
+ nodeC, buildShardMoveInImpactVector()
+ );
+
+ private Map moveShardImpact3 = ImmutableMap.of(
+ nodeC, buildShardMoveOutImpactVector(),
+ nodeA, buildShardMoveInImpactVector()
+ );
+
+ @Before
+ public void setup() {
+ initMocks(this);
+ this.testCollator = new Collator(impactAssessor, mockComparator, mockDecider1,
+ mockDecider2);
+ setupActions();
+ setupDecisions();
+ }
+
+ @Test
+ public void testCollatorAcyclicImpactDecisions() {
+ when(mockDecider1.getFlowUnits()).thenReturn(Collections.singletonList(decision1));
+ when(mockDecider2.getFlowUnits()).thenReturn(Collections.singletonList(decision2));
+ // fix some order for the test.
+ when(mockComparator.compare(eq(moveShardAction1), eq(moveShardAction2))).thenReturn(-1);
+
+ Decision decision = testCollator.operate();
+
+ assertEquals(1, decision.getActions().size());
+ assertEquals(moveShardAction2, decision.getActions().get(0));
+ }
+
+ @Test
+ public void testCollatorCyclicImpactDecisions() {
+ when(decision1.getActions()).thenReturn(Arrays.asList(moveShardAction1, moveShardAction3));
+ when(mockDecider1.getFlowUnits()).thenReturn(Collections.singletonList(decision1));
+ when(mockDecider2.getFlowUnits()).thenReturn(Collections.singletonList(decision2));
+ when(mockComparator.compare(any(Action.class), any(Action.class))).thenReturn(0);
+ this.testCollator = new Collator(impactAssessor, new ImpactBasedActionComparator(),
+ mockDecider1, mockDecider2);
+
+ Decision decision = testCollator.operate();
+
+ assertEquals(1, decision.getActions().size());
+ assertEquals(moveShardAction3, decision.getActions().get(0));
+ }
+
+ @Test
+ public void testCollatorEmptyActions() {
+ when(decision1.getActions()).thenReturn(Collections.emptyList());
+ when(decision2.getActions()).thenReturn(Collections.emptyList());
+ when(mockDecider1.getFlowUnits()).thenReturn(Collections.singletonList(decision1));
+ when(mockDecider2.getFlowUnits()).thenReturn(Collections.singletonList(decision2));
+ this.testCollator = new Collator(impactAssessor, new ImpactBasedActionComparator(),
+ mockDecider1, mockDecider2);
+
+ Decision decision = testCollator.operate();
+
+ assertTrue(decision.getActions().isEmpty());
+ }
+
+ @Test
+ public void testNoDeciders() {
+ testCollator = new Collator(impactAssessor, mockComparator);
+
+ final Decision decision = testCollator.operate();
+
+ assertTrue(decision.getActions().isEmpty());
+ }
+
+ public void setupDecisions() {
+ when(decision1.getActions())
+ .thenReturn(Collections.singletonList(moveShardAction1));
+ when(decision2.getActions())
+ .thenReturn(Collections.singletonList(moveShardAction2));
+ }
+
+ public void setupActions() {
+ when(moveShardAction1.name()).thenReturn(moveShardActionName);
+ when(moveShardAction1.impact()).thenReturn(moveShardImpact1);
+ when(moveShardAction1.impactedNodes()).thenReturn(new ArrayList<>(moveShardImpact1.keySet()));
+
+ when(moveShardAction2.name()).thenReturn(moveShardActionName);
+ when(moveShardAction2.impact()).thenReturn(moveShardImpact2);
+ when(moveShardAction2.impactedNodes()).thenReturn(new ArrayList<>(moveShardImpact2.keySet()));
+
+ when(moveShardAction3.name()).thenReturn(moveShardActionName);
+ when(moveShardAction3.impact()).thenReturn(moveShardImpact3);
+ when(moveShardAction3.impactedNodes()).thenReturn(new ArrayList<>(moveShardImpact3.keySet()));
+ }
+
+ public ImpactVector buildShardMoveOutImpactVector() {
+ final ImpactVector impactVector = new ImpactVector();
+ impactVector.decreasesPressure(Dimension.CPU);
+ impactVector.decreasesPressure(Dimension.HEAP);
+
+ return impactVector;
+ }
+
+ public ImpactVector buildShardMoveInImpactVector() {
+ final ImpactVector impactVector = new ImpactVector();
+ impactVector.increasesPressure(Dimension.CPU);
+ impactVector.increasesPressure(Dimension.HEAP);
+
+ return impactVector;
+ }
+}
diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessorTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessorTest.java
new file mode 100644
index 000000000..3fff0a6be
--- /dev/null
+++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/collator/ImpactAssessorTest.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails.Id;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails.Ip;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class ImpactAssessorTest {
+
+ @Mock
+ private Action mockAction;
+
+ @Mock
+ private ImpactAssessment mockAssessment;
+
+ private ImpactAssessor testAssessor;
+
+ @Before
+ public void setup() {
+ initMocks(this);
+ when(mockAction.impactedNodes()).thenReturn(Collections.singletonList(new NodeKey(new Id(
+ "other node"), new Ip("2.2.3.4"))));
+ this.testAssessor = new ImpactAssessor();
+ }
+
+ @Test
+ public void testIsImpactAlignedNodeMissingFromOverallImpact() {
+ NodeKey nodeKey = new NodeKey(new Id("this node"), new Ip("1.2.3.4"));
+ Map testOverallAssessment = new HashMap<>();
+ testOverallAssessment
+ .put(nodeKey, new ImpactAssessment(nodeKey));
+
+ boolean isAligned = testAssessor.isImpactAligned(mockAction, testOverallAssessment);
+
+ assertFalse(isAligned);
+ }
+
+ @Test
+ public void testUndoActionImpactOnOverallAssessmentNodeMissing() {
+ NodeKey nodeKey = new NodeKey(new Id("this node"), new Ip("1.2.3.4"));
+ Map testOverallAssessment = new HashMap<>();
+ testOverallAssessment
+ .put(nodeKey, mockAssessment);
+
+ testAssessor.undoActionImpactOnOverallAssessment(mockAction, testOverallAssessment);
+
+ verify(mockAssessment, times(0)).removeActionImpact(anyString(), any());
+ }
+}