diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/Action.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/Action.java index 1b4346ecf..dc2a65465 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/Action.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/Action.java @@ -31,7 +31,7 @@ public interface Action { boolean isActionable(); /** Time to wait since last recommendation, before suggesting this action again */ - int coolOffPeriodInSeconds(); + long coolOffPeriodInMillis(); /** * Called when the action is invoked. diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyQueueCapacityAction.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyQueueCapacityAction.java index 38a960e99..9523ff493 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyQueueCapacityAction.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyQueueCapacityAction.java @@ -30,7 +30,7 @@ public class ModifyQueueCapacityAction implements Action { public static final String NAME = "modify_queue_capacity"; - public static final int COOL_OFF_PERIOD_IN_SECONDS = 300; + public static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000; private int currentCapacity; private int desiredCapacity; @@ -61,8 +61,8 @@ public boolean isActionable() { } @Override - public int coolOffPeriodInSeconds() { - return COOL_OFF_PERIOD_IN_SECONDS; + public long coolOffPeriodInMillis() { + return COOL_OFF_PERIOD_IN_MILLIS; } @Override diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Publisher.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Publisher.java index a3ba9d8f6..61304ad93 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Publisher.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/Publisher.java @@ -21,32 +21,63 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class Publisher extends NonLeafNode { private static final Logger LOG = LogManager.getLogger(Publisher.class); + private final long initTime; private Collator collator; private boolean isMuted = false; + private Map actionToExecutionTime; public Publisher(int evalIntervalSeconds, Collator collator) { super(0, evalIntervalSeconds); this.collator = collator; + this.actionToExecutionTime = new HashMap<>(); + initTime = Instant.now().toEpochMilli(); + } + + /** + * Returns true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago + * + *

If this Publisher has never executed the action, the last execution time is defined as the time that the publisher + * object was constructed. + * + * @param action The {@link Action} to test + * @return true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago + */ + public boolean isCooledOff(Action action) { + long lastExecution = actionToExecutionTime.getOrDefault(action.name(), initTime); + long elapsed = Instant.now().toEpochMilli() - lastExecution; + if (elapsed >= action.coolOffPeriodInMillis()) { + return true; + } else { + LOG.debug("Publisher: Action {} still has {} ms left in its cool off period", + action.name(), + action.coolOffPeriodInMillis() - elapsed); + return false; + } } @Override public EmptyFlowUnit operate() { - // TODO: Pass through implementation, need to add dampening, cool-off, action flip-flop + // TODO: Pass through implementation, need to add dampening, action flip-flop // avoidance, state persistence etc. - Decision decision = collator.getFlowUnits().get(0); for (Action action : decision.getActions()) { - LOG.info("Executing action: [{}]", action.name()); - action.execute(); + if (isCooledOff(action)) { // Only execute actions which have passed their cool off period + LOG.info("Publisher: Executing action: [{}]", action.name()); + action.execute(); + actionToExecutionTime.put(action.name(), Instant.now().toEpochMilli()); + } } - return new EmptyFlowUnit(System.currentTimeMillis()); + return new EmptyFlowUnit(Instant.now().toEpochMilli()); } @Override @@ -84,4 +115,8 @@ public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { public void handleNodeMuted() { assert true; } + + public long getInitTime() { + return this.initTime; + } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyQueueCapacityActionTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyQueueCapacityActionTest.java index ce5d75e3b..cae8b3e2c 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyQueueCapacityActionTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/actions/ModifyQueueCapacityActionTest.java @@ -39,7 +39,8 @@ public void testIncreaseCapacity() { ModifyQueueCapacityAction modifyQueueCapacityAction = new ModifyQueueCapacityAction(node1, ResourceEnum.WRITE_THREADPOOL, 300, true); assertTrue(modifyQueueCapacityAction.getDesiredCapacity() > modifyQueueCapacityAction.getCurrentCapacity()); assertTrue(modifyQueueCapacityAction.isActionable()); - assertEquals(300, modifyQueueCapacityAction.coolOffPeriodInSeconds()); + assertEquals(ModifyQueueCapacityAction.COOL_OFF_PERIOD_IN_MILLIS, + modifyQueueCapacityAction.coolOffPeriodInMillis()); assertEquals(ResourceEnum.WRITE_THREADPOOL, modifyQueueCapacityAction.getThreadPool()); assertEquals(1, modifyQueueCapacityAction.impactedNodes().size()); @@ -57,7 +58,8 @@ public void testDecreaseCapacity() { ModifyQueueCapacityAction modifyQueueCapacityAction = new ModifyQueueCapacityAction(node1, ResourceEnum.SEARCH_THREADPOOL, 1500, false); assertTrue(modifyQueueCapacityAction.getDesiredCapacity() < modifyQueueCapacityAction.getCurrentCapacity()); assertTrue(modifyQueueCapacityAction.isActionable()); - assertEquals(300, modifyQueueCapacityAction.coolOffPeriodInSeconds()); + assertEquals(ModifyQueueCapacityAction.COOL_OFF_PERIOD_IN_MILLIS, + modifyQueueCapacityAction.coolOffPeriodInMillis()); assertEquals(ResourceEnum.SEARCH_THREADPOOL, modifyQueueCapacityAction.getThreadPool()); assertEquals(1, modifyQueueCapacityAction.impactedNodes().size()); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/PublisherTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/PublisherTest.java new file mode 100644 index 000000000..fe6fdaeb3 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/decisionmaker/deciders/PublisherTest.java @@ -0,0 +1,84 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action; + +import com.google.common.collect.Lists; + +import java.time.Instant; +import java.util.List; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +public class PublisherTest { + private static final int EVAL_INTERVAL_S = 5; + private static Publisher publisher; + + // Mock objects + @Mock + private Collator collator; + + @Mock + private Decision decision; + + @Mock + private Action action; + + private static class TestDecider extends Decider { + public TestDecider(long evalIntervalSeconds, int decisionFrequency) { + super(evalIntervalSeconds, decisionFrequency); + } + + @Override + public String name() { + return getClass().getSimpleName(); + } + + @Override + public Decision operate() { + return null; + } + } + + @BeforeClass + public static void setupClass() { + } + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + publisher = new Publisher(EVAL_INTERVAL_S, collator); + } + + @Test + public void testIsCooledOff() throws Exception { + List decisionList = Lists.newArrayList(decision); + Mockito.when(collator.getFlowUnits()).thenReturn(decisionList); + Mockito.when(decision.getActions()).thenReturn(Lists.newArrayList(action)); + Mockito.when(action.name()).thenReturn("testIsCooledOffAction"); + Mockito.when(action.coolOffPeriodInMillis()).thenReturn(100_000L); + // Verify that a newly initialized publisher doesn't execute an action until the publisher object + // has been alive for longer than the action's cool off period + publisher.operate(); + Mockito.verify(action, Mockito.times(0)).execute(); + Mockito.when(action.coolOffPeriodInMillis()).thenReturn(Instant.now().toEpochMilli() + - publisher.getInitTime() - 1000L); + publisher.operate(); + Mockito.verify(action, Mockito.times(1)).execute(); + Mockito.reset(action); + // Verify that a publisher doesn't execute a previously executed action until the action's cool off period + // has elapsed + Mockito.when(action.coolOffPeriodInMillis()).thenReturn(3000L); + publisher.operate(); + Mockito.verify(action, Mockito.times(0)).execute(); + // Verify that a published executes a previously executed action once the action's cool off period has elapsed + Thread.sleep(4000L); + publisher.operate(); + Mockito.verify(action, Mockito.times(1)).execute(); + } +}