Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Implement cool off handling for the Publisher (#272)
Browse files Browse the repository at this point in the history
* Implement cool off handling for the Publisher

The Publisher shouldn't spam the same action over and over again. It
should wait for a specified period of time before repeating an action.

This commit implements this logic.

* Fixup Javadoc styling for Publisher

* Address PR comments for Publisher cool off

* Respond to PR comments
  • Loading branch information
Sid Narayan authored Jul 21, 2020
1 parent 1b6c9a9 commit dbd8675
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface Action {
boolean isActionable();

/** Time to wait since last recommendation, before suggesting this action again */
int coolOffPeriodInSeconds();
long coolOffPeriodInMillis();

/**
* Called when the action is invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class ModifyQueueCapacityAction implements Action {

public static final String NAME = "modify_queue_capacity";
public static final int COOL_OFF_PERIOD_IN_SECONDS = 300;
public static final long COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000;

private int currentCapacity;
private int desiredCapacity;
Expand Down Expand Up @@ -61,8 +61,8 @@ public boolean isActionable() {
}

@Override
public int coolOffPeriodInSeconds() {
return COOL_OFF_PERIOD_IN_SECONDS;
public long coolOffPeriodInMillis() {
return COOL_OFF_PERIOD_IN_MILLIS;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,63 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Publisher extends NonLeafNode<EmptyFlowUnit> {

private static final Logger LOG = LogManager.getLogger(Publisher.class);
private final long initTime;

private Collator collator;
private boolean isMuted = false;
private Map<String, Long> actionToExecutionTime;

public Publisher(int evalIntervalSeconds, Collator collator) {
super(0, evalIntervalSeconds);
this.collator = collator;
this.actionToExecutionTime = new HashMap<>();
initTime = Instant.now().toEpochMilli();
}

/**
* Returns true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago
*
* <p>If this Publisher has never executed the action, the last execution time is defined as the time that the publisher
* object was constructed.
*
* @param action The {@link Action} to test
* @return true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago
*/
public boolean isCooledOff(Action action) {
long lastExecution = actionToExecutionTime.getOrDefault(action.name(), initTime);
long elapsed = Instant.now().toEpochMilli() - lastExecution;
if (elapsed >= action.coolOffPeriodInMillis()) {
return true;
} else {
LOG.debug("Publisher: Action {} still has {} ms left in its cool off period",
action.name(),
action.coolOffPeriodInMillis() - elapsed);
return false;
}
}

@Override
public EmptyFlowUnit operate() {
// TODO: Pass through implementation, need to add dampening, cool-off, action flip-flop
// TODO: Pass through implementation, need to add dampening, action flip-flop
// avoidance, state persistence etc.

Decision decision = collator.getFlowUnits().get(0);
for (Action action : decision.getActions()) {
LOG.info("Executing action: [{}]", action.name());
action.execute();
if (isCooledOff(action)) { // Only execute actions which have passed their cool off period
LOG.info("Publisher: Executing action: [{}]", action.name());
action.execute();
actionToExecutionTime.put(action.name(), Instant.now().toEpochMilli());
}
}
return new EmptyFlowUnit(System.currentTimeMillis());
return new EmptyFlowUnit(Instant.now().toEpochMilli());
}

@Override
Expand Down Expand Up @@ -84,4 +115,8 @@ public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
public void handleNodeMuted() {
assert true;
}

public long getInitTime() {
return this.initTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public void testIncreaseCapacity() {
ModifyQueueCapacityAction modifyQueueCapacityAction = new ModifyQueueCapacityAction(node1, ResourceEnum.WRITE_THREADPOOL, 300, true);
assertTrue(modifyQueueCapacityAction.getDesiredCapacity() > modifyQueueCapacityAction.getCurrentCapacity());
assertTrue(modifyQueueCapacityAction.isActionable());
assertEquals(300, modifyQueueCapacityAction.coolOffPeriodInSeconds());
assertEquals(ModifyQueueCapacityAction.COOL_OFF_PERIOD_IN_MILLIS,
modifyQueueCapacityAction.coolOffPeriodInMillis());
assertEquals(ResourceEnum.WRITE_THREADPOOL, modifyQueueCapacityAction.getThreadPool());
assertEquals(1, modifyQueueCapacityAction.impactedNodes().size());

Expand All @@ -57,7 +58,8 @@ public void testDecreaseCapacity() {
ModifyQueueCapacityAction modifyQueueCapacityAction = new ModifyQueueCapacityAction(node1, ResourceEnum.SEARCH_THREADPOOL, 1500, false);
assertTrue(modifyQueueCapacityAction.getDesiredCapacity() < modifyQueueCapacityAction.getCurrentCapacity());
assertTrue(modifyQueueCapacityAction.isActionable());
assertEquals(300, modifyQueueCapacityAction.coolOffPeriodInSeconds());
assertEquals(ModifyQueueCapacityAction.COOL_OFF_PERIOD_IN_MILLIS,
modifyQueueCapacityAction.coolOffPeriodInMillis());
assertEquals(ResourceEnum.SEARCH_THREADPOOL, modifyQueueCapacityAction.getThreadPool());
assertEquals(1, modifyQueueCapacityAction.impactedNodes().size());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;

import com.google.common.collect.Lists;

import java.time.Instant;
import java.util.List;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

public class PublisherTest {
private static final int EVAL_INTERVAL_S = 5;
private static Publisher publisher;

// Mock objects
@Mock
private Collator collator;

@Mock
private Decision decision;

@Mock
private Action action;

private static class TestDecider extends Decider {
public TestDecider(long evalIntervalSeconds, int decisionFrequency) {
super(evalIntervalSeconds, decisionFrequency);
}

@Override
public String name() {
return getClass().getSimpleName();
}

@Override
public Decision operate() {
return null;
}
}

@BeforeClass
public static void setupClass() {
}

@Before
public void setup() {
MockitoAnnotations.initMocks(this);
publisher = new Publisher(EVAL_INTERVAL_S, collator);
}

@Test
public void testIsCooledOff() throws Exception {
List<Decision> decisionList = Lists.newArrayList(decision);
Mockito.when(collator.getFlowUnits()).thenReturn(decisionList);
Mockito.when(decision.getActions()).thenReturn(Lists.newArrayList(action));
Mockito.when(action.name()).thenReturn("testIsCooledOffAction");
Mockito.when(action.coolOffPeriodInMillis()).thenReturn(100_000L);
// Verify that a newly initialized publisher doesn't execute an action until the publisher object
// has been alive for longer than the action's cool off period
publisher.operate();
Mockito.verify(action, Mockito.times(0)).execute();
Mockito.when(action.coolOffPeriodInMillis()).thenReturn(Instant.now().toEpochMilli()
- publisher.getInitTime() - 1000L);
publisher.operate();
Mockito.verify(action, Mockito.times(1)).execute();
Mockito.reset(action);
// Verify that a publisher doesn't execute a previously executed action until the action's cool off period
// has elapsed
Mockito.when(action.coolOffPeriodInMillis()).thenReturn(3000L);
publisher.operate();
Mockito.verify(action, Mockito.times(0)).execute();
// Verify that a published executes a previously executed action once the action's cool off period has elapsed
Thread.sleep(4000L);
publisher.operate();
Mockito.verify(action, Mockito.times(1)).execute();
}
}

0 comments on commit dbd8675

Please sign in to comment.