Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

JVM decider #326

Merged
merged 11 commits into from
Sep 3, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension.HEAP;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache.CacheUtil.KB_TO_BYTES;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache.CacheUtil.MB_TO_BYTES;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.configs.CacheActionConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.util.NodeConfigCacheReaderUtil;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -135,7 +132,6 @@ public ResourceEnum getCacheType() {
return cacheType;
}

@VisibleForTesting
public static long getThresholdInBytes(double threshold, long heapSize) {
return (long) (threshold * heapSize);
}
Expand All @@ -150,7 +146,7 @@ public static final class Builder {
private final AppContext appContext;
private final RcaConf rcaConf;

private long stepSizeInBytes;
private double stepSizeInPercent;
private boolean isIncrease;
private boolean canUpdate;
private long coolOffPeriodInMillis;
Expand Down Expand Up @@ -180,11 +176,11 @@ private Builder(
this.heapMaxSizeInBytes = NodeConfigCacheReaderUtil.readHeapMaxSizeInBytes(
appContext.getNodeConfigCache(), esNode);
this.desiredCacheMaxSizeInBytes = null;
setDefaultStepSize(cacheType);

CacheActionConfig cacheActionConfig = new CacheActionConfig(rcaConf);
double upperBoundThreshold = cacheActionConfig.getThresholdConfig(cacheType).upperBound();
double lowerBoundThreshold = cacheActionConfig.getThresholdConfig(cacheType).lowerBound();
this.stepSizeInPercent = cacheActionConfig.getStepSize(cacheType);
if (heapMaxSizeInBytes != null) {
this.upperBoundInBytes = getThresholdInBytes(upperBoundThreshold, heapMaxSizeInBytes);
this.lowerBoundInBytes = getThresholdInBytes(lowerBoundThreshold, heapMaxSizeInBytes);
Expand All @@ -195,23 +191,6 @@ private Builder(
}
}

private void setDefaultStepSize(ResourceEnum cacheType) {
// TODO: Move configuration values to rca.conf
// TODO: Update the step size to also include percentage of heap size along with absolute value
switch (cacheType) {
case FIELD_DATA_CACHE:
// Field data cache having step size of 512MB
this.stepSizeInBytes = (long) 512 * MB_TO_BYTES;
break;
case SHARD_REQUEST_CACHE:
// Shard request cache step size of 512KB
this.stepSizeInBytes = (long) 512 * KB_TO_BYTES;
break;
default:
throw new IllegalArgumentException(String.format("Unrecognizable cache type: [%s]", cacheType.toString()));
}
}

public Builder coolOffPeriod(long coolOffPeriodInMillis) {
this.coolOffPeriodInMillis = coolOffPeriodInMillis;
return this;
Expand All @@ -232,8 +211,8 @@ public Builder setDesiredCacheMaxSizeToMax() {
return this;
}

public Builder stepSizeInBytes(long stepSizeInBytes) {
this.stepSizeInBytes = stepSizeInBytes;
public Builder stepSizeInPercent(double stepSizeInPercent) {
this.stepSizeInPercent = stepSizeInPercent;
return this;
}

Expand All @@ -245,6 +224,7 @@ public ModifyCacheMaxSizeAction build() {
-1, -1, coolOffPeriodInMillis, false);
}

long stepSizeInBytes = (long) (stepSizeInPercent * heapMaxSizeInBytes);
if (desiredCacheMaxSizeInBytes == null) {
desiredCacheMaxSizeInBytes = isIncrease ? currentCacheMaxSizeInBytes + stepSizeInBytes :
currentCacheMaxSizeInBytes - stepSizeInBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.configs.QueueActionConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;

Expand Down Expand Up @@ -119,7 +118,6 @@ public ResourceEnum getThreadPool() {

public static final class Builder {
public static final long DEFAULT_COOL_OFF_PERIOD_IN_MILLIS = 300 * 1_000;
public static final int DEFAULT_STEP_SIZE = 50;
public static final boolean DEFAULT_IS_INCREASE = true;
public static final boolean DEFAULT_CAN_UPDATE = true;

Expand All @@ -143,7 +141,6 @@ public Builder(NodeKey esNode, ResourceEnum threadPool, final AppContext appCont
this.appContext = appContext;
this.rcaConf = conf;
this.coolOffPeriodInMillis = DEFAULT_COOL_OFF_PERIOD_IN_MILLIS;
this.stepSize = DEFAULT_STEP_SIZE;
this.increase = DEFAULT_IS_INCREASE;
this.canUpdate = DEFAULT_CAN_UPDATE;
this.desiredCapacity = null;
Expand All @@ -153,6 +150,7 @@ public Builder(NodeKey esNode, ResourceEnum threadPool, final AppContext appCont
QueueActionConfig queueActionConfig = new QueueActionConfig(rcaConf);
this.upperBound = queueActionConfig.getThresholdConfig(threadPool).upperBound();
this.lowerBound = queueActionConfig.getThresholdConfig(threadPool).lowerBound();
this.stepSize = queueActionConfig.getStepSize(threadPool);
}

public Builder coolOffPeriod(long coolOffPeriodInMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* "action-config-settings": {
* // Cache Max Size bounds are expressed as %age of JVM heap size
* "cache-settings": {
* "total-step-count": 20,
* "fielddata": {
* "upper-bound": 0.4,
* "lower-bound": 0.1
Expand All @@ -52,8 +53,11 @@ public class CacheActionConfig {
private NestedConfig cacheSettingsConfig;
private FieldDataCacheConfig fieldDataCacheConfig;
private ShardRequestCacheConfig shardRequestCacheConfig;
private Config<Integer> totalStepCount;
private Map<ResourceEnum, ThresholdConfig<Double>> thresholdConfigMap;

private static final String TOTAL_STEP_COUNT_CONFIG_NAME = "total-step-count";
public static final int DEFAULT_TOTAL_STEP_COUNT = 20;
public static final Double DEFAULT_FIELDDATA_CACHE_UPPER_BOUND = 0.4;
public static final Double DEFAULT_FIELDDATA_CACHE_LOWER_BOUND = 0.1;
public static final Double DEFAULT_SHARD_REQUEST_CACHE_UPPER_BOUND = 0.05;
Expand All @@ -64,9 +68,24 @@ public CacheActionConfig(RcaConf conf) {
cacheSettingsConfig = new NestedConfig("cache-settings", actionConfig);
fieldDataCacheConfig = new FieldDataCacheConfig(cacheSettingsConfig);
shardRequestCacheConfig = new ShardRequestCacheConfig(cacheSettingsConfig);
totalStepCount = new Config<>(TOTAL_STEP_COUNT_CONFIG_NAME, cacheSettingsConfig.getValue(),
DEFAULT_TOTAL_STEP_COUNT, (s) -> (s > 0), Integer.class);
createThresholdConfigMap();
}

public int getTotalStepCount() {
return totalStepCount.getValue();
}

/**
* this function calculate the size of a single step given the range {lower bound - upper bound}
* and number of steps
*/
public double getStepSize(ResourceEnum cacheType) {
ThresholdConfig<Double> threshold = getThresholdConfig(cacheType);
return (threshold.upperBound() - threshold.lowerBound()) / (double) getTotalStepCount();
}
Comment on lines +84 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


public ThresholdConfig<Double> getThresholdConfig(ResourceEnum cacheType) {
if (!thresholdConfigMap.containsKey(cacheType)) {
String msg = "Threshold config requested for unknown cache type: " + cacheType.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* "action-config-settings": {
* // Queue Capacity bounds are expressed as absolute queue size
* "queue-settings": {
* "total-step-count": 20,
* "search": {
* "upper-bound": 3000,
* "lower-bound": 1000
Expand All @@ -51,8 +52,11 @@ public class QueueActionConfig {
private NestedConfig queueSettingsConfig;
private SearchQueueConfig searchQueueConfig;
private WriteQueueConfig writeQueueConfig;
private Config<Integer> totalStepCount;
private Map<ResourceEnum, ThresholdConfig<Integer>> thresholdConfigMap;

private static final String TOTAL_STEP_COUNT_CONFIG_NAME = "total-step-count";
public static final int DEFAULT_TOTAL_STEP_COUNT = 20;
public static final int DEFAULT_SEARCH_QUEUE_UPPER_BOUND = 3000;
public static final int DEFAULT_SEARCH_QUEUE_LOWER_BOUND = 500;
public static final int DEFAULT_WRITE_QUEUE_UPPER_BOUND = 1000;
Expand All @@ -63,9 +67,24 @@ public QueueActionConfig(RcaConf conf) {
queueSettingsConfig = new NestedConfig("queue-settings", actionConfig);
searchQueueConfig = new SearchQueueConfig(queueSettingsConfig);
writeQueueConfig = new WriteQueueConfig(queueSettingsConfig);
totalStepCount = new Config<>(TOTAL_STEP_COUNT_CONFIG_NAME, queueSettingsConfig.getValue(),
DEFAULT_TOTAL_STEP_COUNT, (s) -> (s > 0), Integer.class);
createThresholdConfigMap();
}

public int getTotalStepCount() {
return totalStepCount.getValue();
}

/**
* this function calculate the size of a single step given the range {lower bound - upper bound}
* and number of steps
*/
public int getStepSize(ResourceEnum threadPool) {
ThresholdConfig<Integer> threshold = getThresholdConfig(threadPool);
return (int) ((threshold.upperBound() - threshold.lowerBound()) / (double) getTotalStepCount());
}

public ThresholdConfig<Integer> getThresholdConfig(ResourceEnum threadPool) {
if (!thresholdConfigMap.containsKey(threadPool)) {
String msg = "Threshold config requested for unknown threadpool queue: " + threadPool.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.DeciderConfig.getDefaultCachePriority;
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.DeciderConfig.getDefaultWorkloadPriority;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.DeciderConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -48,13 +43,11 @@ public abstract class Decider extends NonLeafNode<Decision> {
private static final Logger LOG = LogManager.getLogger(Decider.class);
protected final int decisionFrequency; // Measured in terms of number of evaluationIntervalPeriods
protected RcaConf rcaConf;
DeciderConfig configObj;

public Decider(long evalIntervalSeconds, int decisionFrequency) {
super(0, evalIntervalSeconds);
this.decisionFrequency = decisionFrequency;
this.rcaConf = null;
this.configObj = null;
}

public abstract String name();
Expand Down Expand Up @@ -109,15 +102,5 @@ public void handleNodeMuted() {
@Override
public void readRcaConf(RcaConf conf) {
rcaConf = conf;
configObj = rcaConf.getDeciderConfig();
}

public List<String> getWorkLoadPriority() {
return configObj != null ? configObj.getWorkloadPriorityOrder() : getDefaultWorkloadPriority();
}

public List<String> getCachePriority() {
return configObj != null ? configObj.getCachePriorityOrder() : getDefaultCachePriority();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.jvm;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decision;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.jvm.old_gen.OldGenDecisionPolicy;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.HighHeapUsageClusterRca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.List;

/**
* decider to bring down heap usage in young gen / old gen
*/
public class HeapHealthDecider extends Decider {

public static final String NAME = "HeapHealthDecider";
private final HighHeapUsageClusterRca highHeapUsageClusterRca;
private final OldGenDecisionPolicy oldGenDecisionPolicy;
private int counter = 0;

public HeapHealthDecider(int decisionFrequency, final HighHeapUsageClusterRca highHeapUsageClusterRca) {
//TODO : refactor parent class to remove evalIntervalSeconds completely
super(5, decisionFrequency);
this.highHeapUsageClusterRca = highHeapUsageClusterRca;
oldGenDecisionPolicy = new OldGenDecisionPolicy(this.getAppContext(), rcaConf);
}

@Override
public String name() {
return NAME;
}

@Override
public Decision operate() {
Decision decision = new Decision(System.currentTimeMillis(), NAME);
counter += 1;
if (counter < decisionFrequency) {
return decision;
}

counter = 0;
if (highHeapUsageClusterRca.getFlowUnits().isEmpty()) {
return decision;
}

ResourceFlowUnit<HotClusterSummary> flowUnit = highHeapUsageClusterRca.getFlowUnits().get(0);
if (!flowUnit.hasResourceSummary()) {
return decision;
}
HotClusterSummary clusterSummary = flowUnit.getSummary();
for (HotNodeSummary nodeSummary : clusterSummary.getHotNodeSummaryList()) {
NodeKey esNode = new NodeKey(nodeSummary.getNodeID(), nodeSummary.getHostAddress());
for (HotResourceSummary resource : nodeSummary.getHotResourceSummaryList()) {
if (resource.getResource().equals(ResourceUtil.OLD_GEN_HEAP_USAGE)) {
List<Action> actions = oldGenDecisionPolicy.actions(esNode, resource.getValue());
actions.forEach(decision::addAction);
}
//TODO : Add policy for young gen
}
}
return decision;
}
}
Loading