Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Implement Action Flip Flop Detection in the Publisher #287

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.collections;
khushbr marked this conversation as resolved.
Show resolved Hide resolved

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

/**
* Caches a set of elements which are automatically evicted based on the cache TTL.
*
* <p>Subsequent calls to add with the same element refresh the expiry period for that element.
*/
public class TimeExpiringSet<E> implements Iterable<E> {
private Cache<E, E> cache;

/**
* Allocates a new TimeExpiringSet whose elements expire after the given time period
*
* <p>E.g. for a ttl of 5 and a unit of TimeUnit.SECONDS, a newly added element will remain
* in the Set for 5 seconds before it is evicted.
*
* @param ttl The magnitude of the time a unit will remain in the cache before it is evicted
* @param unit The unit of the ttl
*/
public TimeExpiringSet(long ttl, TimeUnit unit) {
cache = CacheBuilder.newBuilder()
.expireAfterWrite(ttl, unit)
.build();
}

/**
* Returns true if e is currently a member of the Set
* @param e The element to tests
* @return true if e is currently a member of the Set
*/
public boolean contains(E e) {
return cache.getIfPresent(e) != null;
}

/**
* Returns the number of elements currently in the Set
* @return the number of elements currently in the Set
*/
public long size() {
return cache.size();
}

/**
* Returns a weakly-consistent, thread-safe {@link Iterator} over the elements in the Set
*
* <p>This means that while the Iterator is thread-safe, if elements expire after the Iterator is
* created, the changes may not be reflected in the iteration. That is, you may iterate over an
* element which was invalidated during your iteration. This is okay for many use cases which can
* tolerate weak consistency.
*
* @return a weakly-consistent, thread-safe {@link Iterator} over the elements in the Set
*/
@Nonnull
public Iterator<E> iterator() {
return cache.asMap().keySet().iterator();
}

/**
* Adds an element into the Set
* @param e the element to add into the Set
*/
public void add(E e) {
cache.put(e, e);
}

/**
* Simple weakly-consistent forEach implementation applies the given action to each element
* @param action The action to apply to each element
*/
public void forEach(Consumer<? super E> action) {
iterator().forEachRemaining(action);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

/**
* Records a series of actions and can determine whether a subsequent action would "flip flop" with
* the previously recorded actions.
*
* <p>A flip flop is defined as any action which would undo changes made by a previous action. For
* example, decreasing CPU allocation then immediately increasing it may be considered a flip flop,
* but this is up to the implementation.
*/
public interface FlipFlopDetector {

/**
* Determines whether action will "flip flop" with any of the previously recorded actions
* @param action The {@link Action} to test
* @return True if action will "flip flop" with any of the previously recorded actions
*/
public boolean isFlipFlop(Action action);

/**
* Records that an action was applied. This action may then be used in any subsequent isFlipFlop
* tests
* @param action The action to record
*/
public void recordAction(Action action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class ImpactVector {

Expand Down Expand Up @@ -64,4 +65,27 @@ public void noImpact(Dimension... dimensions) {
impactMap.put(dimension, Impact.NO_IMPACT);
}
}

/**
* Two ImpactVectors are equal if and only if they have the same impact for each of their
* dimensions
* @param o The other ImpactVector to compare with this
* @return true if and only if this and o have the same impact for each of their dimensions
*/
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ImpactVector that = (ImpactVector) o;
return Objects.equals(impactMap, that.impactMap);
}

@Override
public int hashCode() {
return Objects.hash(impactMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collections.TimeExpiringSet;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Impact;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* A {@link FlipFlopDetector} whose recorded actions expire after a given period of time.
*
* <p>This class defines a flip flop as an {@link Impact#DECREASES_PRESSURE}s followed by an
* {@link Impact#INCREASES_PRESSURE}s to be a flip flops.
*
* <p>This class stores a {@link TimeExpiringSet} of {@link ImpactVector}s per {@link NodeKey}
* that are used to determine these flip flops.
*/
public class TimedFlipFlopDetector implements FlipFlopDetector {
sidheart marked this conversation as resolved.
Show resolved Hide resolved
private Map<NodeKey, TimeExpiringSet<ImpactVector>> flipFlopMap;
private long expiryDuration;
private TimeUnit expiryUnit;

public TimedFlipFlopDetector(long duration, TimeUnit unit) {
flipFlopMap = new HashMap<>();
this.expiryDuration = duration;
this.expiryUnit = unit;
}

/**
* Tests if (prev, curr) is a flip flopping sequence of impacts.
*
* <p>Only an increase following a decrease is considered a flip flop. Therefore, if
* prev decreases pressure and curr increases pressure, then (prev, curr) is a flip flop.
*
* @param prev The {@link Impact} that curr is compared against
* @param curr The {@link Impact} that you'd like to test and apply
* @return Whether or not (prev, curr) is a flip flopping sequence of impacts
*/
protected boolean isFlipFlopImpact(Impact prev, Impact curr) {
return prev.equals(Impact.DECREASES_PRESSURE) && curr.equals(Impact.INCREASES_PRESSURE);
}

/**
* Returns true if the impact for any given Dimension in prev is a flip flop Impact when compared to
* the impact for a given dimension in prev
*
* <p>e.g. for prev = (HEAP: INCREASE, CPU: DECREASE), curr = (HEAP: DECREASE, CPU: INCREASE)
* (prev, curr) is a flip flop vector because a CPU: DECREASE followed by a CPU: INCREASE is a flip
* flop impact. Note that (HEAP: DECREASE) followed by (CPU: INCREASE) is not a flip flop
* because HEAP =/= CPU.
*
* @param prev The first {@link ImpactVector}. Its Impacts appear on the LHS of calls to
* {@link this#isFlipFlopImpact(Impact, Impact)}
* @param curr The second {@link ImpactVector}. Its Impacts appear on the RHS of calls to
* {@link this#isFlipFlopImpact(Impact, Impact)}.
* @return true if the impact for any given Dimension in curr is a flip flop Impact when compared to
* the impact for a given dimension in prev
*/
protected boolean isFlipFlopVector(ImpactVector prev, ImpactVector curr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we will need the resource name for which the flip flop happened ?
So, for for prev = (HEAP: INCREASE, CPU: INCREASE), curr = (HEAP: DECREASE, CPU: INCREASE), we can return HEAP instead of Boolean value ?

What I want to understand is if we use(or might use in future) this information anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's trivial to add this information in the future by extracting the logic in this function, I think we can table this until the need arises

Map<Dimension, Impact> currentImpact = curr.getImpact();
for (Map.Entry<Dimension, Impact> impactEntry : prev.getImpact().entrySet()) {
Dimension dim = impactEntry.getKey();
Impact vImpact = currentImpact.get(dim);
if (isFlipFlopImpact(impactEntry.getValue(), vImpact)) {
return true;
}
}
return false;
}

/**
* Records an action's various {@link ImpactVector}s so that they may be used for future flip
* flop tests
*
* @param action The action to record
*/
@Override
public void recordAction(Action action) {
for (Map.Entry<NodeKey, ImpactVector> entry : action.impact().entrySet()) {
flipFlopMap.compute(entry.getKey(), (k, v) -> {
if (v == null) {
v = new TimeExpiringSet<>(expiryDuration, expiryUnit);
}
v.add(entry.getValue());
return v;
});
}
}

/**
* Returns true if for any NodeKey, ImpactVector pair (k, v) in action, v clashes with any of
* the {@link ImpactVector}s currently associated with k.
*
* @param action The {@link Action} to test
* @return true if applying the action would cause a flip flop
*/
@Override
public boolean isFlipFlop(Action action) {
for (Map.Entry<NodeKey, ImpactVector> entry : action.impact().entrySet()) {
TimeExpiringSet<ImpactVector> previousImpacts = flipFlopMap.get(entry.getKey());
if (previousImpacts == null) {
continue;
}
// Weakly-consistent iteration over the previousImpacts
// If one of these impacts expires during our iteration we may incorrectly determine
// action to be a flip flop until the subsequent call of this function. This is OK for
// our use case since we're always erring on the side of stability
for (ImpactVector impactVector : previousImpacts) {
sidheart marked this conversation as resolved.
Show resolved Hide resolved
if (isFlipFlopVector(impactVector, entry.getValue())) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.FlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.TimedFlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.google.common.annotations.VisibleForTesting;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -33,13 +37,16 @@ public class Publisher extends NonLeafNode<EmptyFlowUnit> {
private final long initTime;

private Collator collator;
private FlipFlopDetector flipFlopDetector;
private boolean isMuted = false;
private Map<String, Long> actionToExecutionTime;

public Publisher(int evalIntervalSeconds, Collator collator) {
super(0, evalIntervalSeconds);
this.collator = collator;
this.actionToExecutionTime = new HashMap<>();
// TODO please bring in guice so we can configure this with DI
khushbr marked this conversation as resolved.
Show resolved Hide resolved
this.flipFlopDetector = new TimedFlipFlopDetector(1, TimeUnit.HOURS);
initTime = Instant.now().toEpochMilli();
}

Expand Down Expand Up @@ -67,13 +74,13 @@ public boolean isCooledOff(Action action) {

@Override
public EmptyFlowUnit operate() {
// TODO: Pass through implementation, need to add dampening, action flip-flop
// avoidance, state persistence etc.
// TODO: Need to add dampening, avoidance, state persistence etc.
Decision decision = collator.getFlowUnits().get(0);
for (Action action : decision.getActions()) {
if (isCooledOff(action)) { // Only execute actions which have passed their cool off period
if (isCooledOff(action) && !flipFlopDetector.isFlipFlop(action)) {
LOG.info("Publisher: Executing action: [{}]", action.name());
action.execute();
flipFlopDetector.recordAction(action);
actionToExecutionTime.put(action.name(), Instant.now().toEpochMilli());
}
}
Expand Down Expand Up @@ -119,4 +126,9 @@ public void handleNodeMuted() {
public long getInitTime() {
return this.initTime;
}

@VisibleForTesting
protected FlipFlopDetector getFlipFlopDetector() {
return this.flipFlopDetector;
}
}
Loading