Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add NodeConfigCollector to collect node configs(threadpool capacity e…
Browse files Browse the repository at this point in the history
…tc.) from ES (#252)
  • Loading branch information
rguo-aws authored Jul 20, 2020
1 parent ec29ff6 commit 9137257
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.NodeConfigFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* this is a base class for node(vertex) in RCA graph that reads configuration settings from ES.
*/
public abstract class EsConfigNode extends NonLeafNode<NodeConfigFlowUnit> {

private static final Logger LOG = LogManager.getLogger(EsConfigNode.class);

public EsConfigNode() {
super(0, 5);
}

/**
* fetch flowunits from local graph node
*
* @param args The wrapper around the flow unit operation.
*/
@Override
public void generateFlowUnitListFromLocal(FlowUnitOperationArgWrapper args) {
long startTime = System.currentTimeMillis();
NodeConfigFlowUnit result;
try {
result = this.operate();
} catch (Exception ex) {
LOG.error("Exception in operate.", ex);
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.EXCEPTION_IN_OPERATE, name(), 1);
result = new NodeConfigFlowUnit(System.currentTimeMillis());
}
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;

PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(
RcaGraphMetrics.GRAPH_NODE_OPERATE_CALL, this.name(), duration);

setLocalFlowUnit(result);
}

@Override
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
final List<FlowUnitMessage> flowUnitMessages =
args.getWireHopper().readFromWire(args.getNode());
List<NodeConfigFlowUnit> flowUnitList = new ArrayList<>();
LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName());
for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
flowUnitList.add(NodeConfigFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
}
setFlowUnits(flowUnitList);
}

@Override
public void handleNodeMuted() {
setLocalFlowUnit(new NodeConfigFlowUnit(System.currentTimeMillis()));
}

/**
* EsConfig metrics are not intended to be persisted
* @param args FlowUnitOperationArgWrapper
*/
@Override
public void persistFlowUnit(FlowUnitOperationArgWrapper args) {
assert true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage.SummaryOneofCase;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotNodeSummaryMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.HashMap;

/**
* a flowunit type to carry ES node configurations (queue/cache capacities, etc.)
*/
public class NodeConfigFlowUnit extends ResourceFlowUnit<HotNodeSummary> {

private final HashMap<Resource, HotResourceSummary> configMap;

public NodeConfigFlowUnit(long timeStamp) {
super(timeStamp);
this.configMap = new HashMap<>();
}

public NodeConfigFlowUnit(long timeStamp, NodeKey nodeKey) {
super(timeStamp, new ResourceContext(Resources.State.HEALTHY), null, false);
this.setSummary(new HotNodeSummary(nodeKey.getNodeId(), nodeKey.getHostAddress()));
this.configMap = new HashMap<>();
}

/**
* Add new config setting to flowunit
* @param resource config setting type
* @param value config setting value
*/
public void addConfig(Resource resource, double value) {
HotResourceSummary configSummary = new HotResourceSummary(resource, Double.NaN, value, 0);
configMap.put(resource, configSummary);
}

/**
* Add new config setting to flowunit
* @param configSummary config setting summary object
*/
public void addConfig(HotResourceSummary configSummary) {
configMap.put(configSummary.getResource(), configSummary);
}

/**
* check if the config setting exist in flowunit
* @param resource config setting type
* @return if config exist
*/
public boolean hasConfig(Resource resource) {
return configMap.containsKey(resource);
}

/**
* read the config value of the config setting from flowunit
* @param resource config setting type
* @return config setting value
*/
public double readConfig(Resource resource) {
HotResourceSummary configSummary = configMap.getOrDefault(resource, null);
if (configSummary == null) {
return Double.NaN;
}
return configSummary.getValue();
}

@Override
public boolean isEmpty() {
return configMap.isEmpty();
}

/**
* build NodeConfigFlowUnit from the protobuf message
*/
public static NodeConfigFlowUnit buildFlowUnitFromWrapper(final FlowUnitMessage message) {
NodeConfigFlowUnit nodeConfigFlowUnit;
if (message.getSummaryOneofCase() == SummaryOneofCase.HOTNODESUMMARY) {
HotNodeSummaryMessage nodeSummaryMessage = message.getHotNodeSummary();
NodeKey nodeKey = new NodeKey(nodeSummaryMessage.getNodeID(),
nodeSummaryMessage.getHostAddress());
nodeConfigFlowUnit = new NodeConfigFlowUnit(message.getTimeStamp(), nodeKey);
if (nodeSummaryMessage.hasHotResourceSummaryList()) {
for (int i = 0;
i < nodeSummaryMessage.getHotResourceSummaryList().getHotResourceSummaryCount(); i++) {
nodeConfigFlowUnit.addConfig(
HotResourceSummary.buildHotResourceSummaryFromMessage(
nodeSummaryMessage.getHotResourceSummaryList().getHotResourceSummary(i))
);
}
}
} else {
nodeConfigFlowUnit = new NodeConfigFlowUnit(message.getTimeStamp());
}
return nodeConfigFlowUnit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;

public class ThreadPool_QueueCapacity extends Metric {
public ThreadPool_QueueCapacity() {
super(AllMetrics.ThreadPoolValue.THREADPOOL_QUEUE_CAPACITY.toString(), 5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static double readDataFromSqlResult(Result<Record> result, Field<String>
double ret = Double.NaN;
try {
Record record = getRecordByName(result, matchedField, matchedFieldName);
ret = record.getValue(MetricsDB.MAX, Double.class);
ret = record.getValue(dataField, Double.class);
}
catch (IllegalArgumentException ie) {
LOG.error("{} fails to match any row in field {}.", matchedFieldName, matchedField.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ public List<HotShardSummary> getHotShardSummaryList() {
}

public void appendNestedSummary(HotResourceSummary summary) {
hotResourceSummaryList.add(summary);
if (summary != null) {
hotResourceSummaryList.add(summary);
}
}

public void appendNestedSummary(HotShardSummary summary) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public class ResourceUtil {
public static final Resource SEARCH_QUEUE_REJECTION = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SEARCH_THREADPOOL)
.setMetricEnum(MetricEnum.QUEUE_REJECTION).build();
public static final Resource WRITE_QUEUE_CAPACITY = Resource.newBuilder()
.setResourceEnum(ResourceEnum.WRITE_THREADPOOL)
.setMetricEnum(MetricEnum.QUEUE_CAPACITY).build();
public static final Resource SEARCH_QUEUE_CAPACITY = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SEARCH_THREADPOOL)
.setMetricEnum(MetricEnum.QUEUE_CAPACITY).build();

/**
* Read the resourceType name from the ResourceType object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.apache.commons.lang3.builder.HashCodeBuilder;

public class NodeKey {
Expand All @@ -26,6 +27,10 @@ public NodeKey(String nodeId, String hostAddress) {
this.hostAddress = hostAddress;
}

public NodeKey(InstanceDetails instanceDetails) {
this(instanceDetails.getInstanceId(), instanceDetails.getInstanceIp());
}

public String getNodeId() {
return nodeId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.EsConfigNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.NodeConfigFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.HashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This is a node level collector in RCA graph which collect the current config settings from ES (queue/cache capacity etc.)
* And pass them down to Decision Maker for the next round of resource auto-tuning.
*/
public class NodeConfigCollector extends EsConfigNode {

private static final Logger LOG = LogManager.getLogger(NodeConfigCollector.class);
private final ThreadPool_QueueCapacity threadPool_queueCapacity;
private final int rcaPeriod;
private int counter;
private final HashMap<Resource, Double> configResult;

public NodeConfigCollector(int rcaPeriod, ThreadPool_QueueCapacity threadPool_queueCapacity) {
this.threadPool_queueCapacity = threadPool_queueCapacity;
this.rcaPeriod = rcaPeriod;
this.counter = 0;
this.configResult = new HashMap<>();
}

private void collectQueueCapacity(MetricFlowUnit flowUnit) {
double writeQueueCapacity = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(),
THREAD_POOL_TYPE.getField(), ThreadPoolType.WRITE.toString(), MetricsDB.MAX);
if (!Double.isNaN(writeQueueCapacity)) {
configResult.put(ResourceUtil.WRITE_QUEUE_CAPACITY, writeQueueCapacity);
}
else {
LOG.error("write queue capacity is NaN");
}
double searchQueueCapacity = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(),
THREAD_POOL_TYPE.getField(), ThreadPoolType.SEARCH.toString(), MetricsDB.MAX);
if (!Double.isNaN(searchQueueCapacity)) {
configResult.put(ResourceUtil.SEARCH_QUEUE_CAPACITY, searchQueueCapacity);
}
else {
LOG.error("search queue capacity is NaN");
}
}

/**
* collect config settings from the upstream metric flowunits and set them into the protobuf
* message PerformanceControllerConfiguration. This will allow us to serialize / de-serialize
* the config settings across grpc and send them to Decision Maker on elected master.
* @return ResourceFlowUnit with HotNodeSummary. And HotNodeSummary carries PerformanceControllerConfiguration
*/
@Override
public NodeConfigFlowUnit operate() {
counter += 1;
for (MetricFlowUnit flowUnit : threadPool_queueCapacity.getFlowUnits()) {
if (flowUnit.isEmpty()) {
continue;
}
collectQueueCapacity(flowUnit);
}
if (counter == rcaPeriod) {
counter = 0;
NodeConfigFlowUnit flowUnits = new NodeConfigFlowUnit(System.currentTimeMillis(), new NodeKey(getInstanceDetails()));
configResult.forEach(flowUnits::addConfig);
return flowUnits;
}
else {
return new NodeConfigFlowUnit(System.currentTimeMillis());
}
}
}
1 change: 1 addition & 0 deletions src/main/proto/inter_node_rpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ enum MetricEnum {

// threadpool
QUEUE_REJECTION = 6 [(additional_fields).name = "queue rejection", (additional_fields).description = "rejection period in second"];
QUEUE_CAPACITY = 7 [(additional_fields).name = "queue capacity", (additional_fields).description = "max capacity of the queue"];
}

/*
Expand Down
Loading

0 comments on commit 9137257

Please sign in to comment.