Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Refreshing from Mainline
Browse files Browse the repository at this point in the history
  • Loading branch information
khushbr committed Jul 7, 2020
1 parent d1b3f21 commit e72afa1
Show file tree
Hide file tree
Showing 14 changed files with 1,031 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;

import java.util.List;
import java.util.Map;

public interface Action {

/**
* Returns true if the configured action is actionable, false otherwise.
*
* <p>Examples of non-actionable actions are resource configurations where limits have been
* reached.
*/
boolean isActionable();

/** Time to wait since last recommendation, before suggesting this action again */
int coolOffPeriodInSeconds();

/**
* Called when the action is invoked.
*
* <p>Specific implementation may include executing the action, or invoking downstream APIs
*/
void execute();

/** Returns a list of Elasticsearch nodes impacted by this action. */
List<NodeKey> impactedNodes();

/** Returns a map of Elasticsearch nodes to ImpactVector of this action on that node */
Map<NodeKey, ImpactVector> impact();

/** Returns action name */
String name();

/** Returns a summary for the configured action */
String summary();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ImpactVector {

public enum Dimension {
HEAP,
CPU,
RAM,
DISK,
NETWORK
}

public enum Impact {
NO_IMPACT,
INCREASES_PRESSURE,
DECREASES_PRESSURE
}

private Map<Dimension, Impact> impactMap = new HashMap<>();

public ImpactVector() {
for (Dimension d : Dimension.values()) {
impactMap.put(d, Impact.NO_IMPACT);
}
}

public Map<Dimension, Impact> getImpact() {
return Collections.unmodifiableMap(impactMap);
}

public void increasesPressure(Dimension... dimensions) {
for (Dimension dimension : dimensions) {
impactMap.put(dimension, Impact.INCREASES_PRESSURE);
}
}

public void decreasesPressure(Dimension... dimensions) {
for (Dimension dimension : dimensions) {
impactMap.put(dimension, Impact.DECREASES_PRESSURE);
}
}

public void noImpact(Dimension... dimensions) {
for (Dimension dimension : dimensions) {
impactMap.put(dimension, Impact.NO_IMPACT);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.CPU;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.NETWORK;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ModifyQueueCapacityAction implements Action {

public static final String NAME = "modify_queue_capacity";
public static final int COOL_OFF_PERIOD_IN_SECONDS = 300;

private int currentCapacity;
private int desiredCapacity;
private ResourceEnum threadPool;
private NodeKey esNode;

private Map<ResourceEnum, Integer> lowerBound = new HashMap<>();
private Map<ResourceEnum, Integer> upperBound = new HashMap<>();

public ModifyQueueCapacityAction(NodeKey esNode, ResourceEnum threadPool, int currentCapacity, boolean increase) {
setBounds();
int STEP_SIZE = 50;
this.esNode = esNode;
this.threadPool = threadPool;
this.currentCapacity = currentCapacity;
int desiredCapacity = increase ? currentCapacity + STEP_SIZE : currentCapacity - STEP_SIZE;
setDesiredCapacity(desiredCapacity);
}

@Override
public String name() {
return NAME;
}

@Override
public boolean isActionable() {
return desiredCapacity != currentCapacity;
}

@Override
public int coolOffPeriodInSeconds() {
return COOL_OFF_PERIOD_IN_SECONDS;
}

@Override
public List<NodeKey> impactedNodes() {
return Collections.singletonList(esNode);
}

@Override
public Map<NodeKey, ImpactVector> impact() {
ImpactVector impactVector = new ImpactVector();
if (desiredCapacity > currentCapacity) {
impactVector.increasesPressure(HEAP, CPU, NETWORK);
} else if (desiredCapacity < currentCapacity) {
impactVector.decreasesPressure(HEAP, CPU, NETWORK);
}
return Collections.singletonMap(esNode, impactVector);
}

@Override
public void execute() {
// Making this a no-op for now
// TODO: Modify based on downstream agent API calls
assert true;
}

@Override
public String summary() {
if (!isActionable()) {
return String.format("No action to take for: [%s]", NAME);
}
return String.format("Update [%s] queue capacity from [%d] to [%d] on node [%s]",
threadPool.toString(), currentCapacity, desiredCapacity, esNode.getNodeId());
}

@Override
public String toString() {
return summary();
}

private void setBounds() {
// This is intentionally not made static because different nodes can
// have different bounds based on instance types
// TODO: Move configuration values to rca.conf

// Write thread pool for bulk write requests
this.lowerBound.put(ResourceEnum.WRITE_THREADPOOL, 100);
this.upperBound.put(ResourceEnum.WRITE_THREADPOOL, 1000);

// Search thread pool
this.lowerBound.put(ResourceEnum.SEARCH_THREADPOOL, 1000);
this.upperBound.put(ResourceEnum.SEARCH_THREADPOOL, 3000);
}

private void setDesiredCapacity(int desiredCapacity) {
this.desiredCapacity = Math.max(Math.min(desiredCapacity, upperBound.get(threadPool)), lowerBound.get(threadPool));
}

public int getCurrentCapacity() {
return currentCapacity;
}

public int getDesiredCapacity() {
return desiredCapacity;
}

public ResourceEnum getThreadPool() {
return threadPool;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;

import java.util.Arrays;
import java.util.List;

/**
* Collator collects and prunes the candidate decisions from each decider so that their impacts are
* aligned
*
* <p>Decisions can increase or decrease pressure on different key resources on an Elasticearch
* node. This is encapsulated in each Action via the {@link ImpactVector}. Since each decider
* independently evaluates its decision, it is possible to have conflicting ImpactVectors from
* actions across deciders.
*
* <p>The collator prunes them to ensure we only take actions that either increase, or decrease
* pressure on a particular node. To resolve conflicts, we prefer stability over performance.
*/
public class Collator extends Decider {

public static final String NAME = "collator";

/* Deciders can choose to publish decisions at different frequencies based on the
* type of resources monitored and rca signals. The collator should however, not introduce any
* unnecessary delays. As soon as a decision is taken, it should be evaluated and published downstream.
*/
private static final int collatorFrequency = 1; // Measured in terms of number of evaluationIntervalPeriods

private List<Decider> deciders;

public Collator(long evalIntervalSeconds, Decider... deciders) {
super(evalIntervalSeconds, collatorFrequency);
this.deciders = Arrays.asList(deciders);
}

@Override
public String name() {
return NAME;
}

@Override
public Decision operate() {
// This is a simple pass-through collator implementation
// TODO: Prune actions by their ImpactVectors

Decision finalDecision = new Decision(System.currentTimeMillis(), NAME);
for (Decider decider : deciders) {
Decision decision = decider.getFlowUnits().get(0);
finalDecision.addAllActions(decision.getActions());
}
return finalDecision;
}
}
Loading

0 comments on commit e72afa1

Please sign in to comment.