Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add NodeConfigCollector to collect node configs(threadpool capacity etc.) from ES #252

Merged
merged 7 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;

public class ThreadPool_QueueCapacity extends Metric {
public ThreadPool_QueueCapacity() {
super(AllMetrics.ThreadPoolValue.THREADPOOL_QUEUE_CAPACITY.name(), 5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static double readDataFromSqlResult(Result<Record> result, Field<String>
double ret = Double.NaN;
try {
Record record = getRecordByName(result, matchedField, matchedFieldName);
ret = record.getValue(MetricsDB.MAX, Double.class);
ret = record.getValue(dataField, Double.class);
}
catch (IllegalArgumentException ie) {
LOG.error("{} fails to match any row in field {}.", matchedFieldName, matchedField.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotNodeSummaryMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.JooqFieldValue;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary;
import com.google.gson.JsonElement;
Expand Down Expand Up @@ -52,13 +53,17 @@ public class HotNodeSummary extends GenericSummary {
private final String hostAddress;
private List<HotResourceSummary> hotResourceSummaryList;
private List<HotShardSummary> hotShardSummaryList;
private NodeConfiguration nodeConfiguration;

public HotNodeSummary(String nodeID, String hostAddress) {
super();
this.nodeID = nodeID;
this.hostAddress = hostAddress;
this.hotResourceSummaryList = new ArrayList<>();
this.hotShardSummaryList = new ArrayList<>();
this.nodeConfiguration = NodeConfiguration.newBuilder()
.setSearchQueueCapacity(-1)
.setWriteQueueCapacity(-1).build();
}

public String getNodeID() {
Expand All @@ -85,6 +90,14 @@ public void appendNestedSummary(HotShardSummary summary) {
hotShardSummaryList.add(summary);
}

public void setNodeConfiguration(NodeConfiguration nodeConfiguration) {
this.nodeConfiguration = nodeConfiguration;
}

public NodeConfiguration getNodeConfiguration() {
return nodeConfiguration;
}

@Override
public HotNodeSummaryMessage buildSummaryMessage() {
final HotNodeSummaryMessage.Builder summaryMessageBuilder = HotNodeSummaryMessage.newBuilder();
Expand All @@ -98,6 +111,9 @@ public HotNodeSummaryMessage buildSummaryMessage() {
summaryMessageBuilder.getHotShardSummaryListBuilder()
.addHotShardSummary(hotShardSummary.buildSummaryMessage());
}
if (nodeConfiguration != null) {
summaryMessageBuilder.getNodeConfigurationBuilder().mergeFrom(nodeConfiguration);
}
return summaryMessageBuilder.build();
}

Expand All @@ -121,6 +137,7 @@ public static HotNodeSummary buildHotNodeSummaryFromMessage(HotNodeSummaryMessag
message.getHotShardSummaryList().getHotShardSummary(i)));
}
}
newSummary.setNodeConfiguration(message.getNodeConfiguration());
return newSummary;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class NodeConfigurationRca extends Rca<ResourceFlowUnit<HotNodeSummary>> {
rguo-aws marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger LOG = LogManager.getLogger(NodeConfigurationRca.class);
private final Metric threadPool_queueCapacity;
private final int rcaPeriod;
private int counter;
private int writeQueueCapacity;
private int searchQueueCapacity;

public <M extends Metric> NodeConfigurationRca(int rcaPeriod, M threadPool_queueCapacity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you briefly describe the rationale of using an RCA node for this ? Do you think it would be cleaner if we add a new node type called ESConfigNode ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yojs are you suggesting that this should be different kind a NonLeafNode and not extend from RCA. With a FlowUnit that is more tuned to node configurations?

I think that is cleaner in general. Will need changes to PersistorBase logic which needs to be refactored anyway to work with non flow units that are not from RCAs. It can be picked separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a new type of node EsConfigNode and extend the class as PerformanceControllerConfigCollector

super(5);
this.threadPool_queueCapacity = threadPool_queueCapacity;
this.rcaPeriod = rcaPeriod;
this.counter = 0;
this.writeQueueCapacity = -1;
this.searchQueueCapacity = -1;
}

private void collectQueueCapacity(MetricFlowUnit flowUnit) {
double writeQueueCapacity = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(),
THREAD_POOL_TYPE.getField(), ThreadPoolType.WRITE.toString(), MetricsDB.MAX);
if (!Double.isNaN(writeQueueCapacity)) {
this.writeQueueCapacity = (int) writeQueueCapacity;
}
double searchQueueCapacity = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(),
THREAD_POOL_TYPE.getField(), ThreadPoolType.SEARCH.toString(), MetricsDB.MAX);
if (!Double.isNaN(searchQueueCapacity)) {
this.searchQueueCapacity = (int) searchQueueCapacity;
}
}

@Override
public ResourceFlowUnit<HotNodeSummary> operate() {
rguo-aws marked this conversation as resolved.
Show resolved Hide resolved
counter += 1;
for (MetricFlowUnit flowUnit : threadPool_queueCapacity.getFlowUnits()) {
if (flowUnit.isEmpty()) {
continue;
}
collectQueueCapacity(flowUnit);
}
if (counter == rcaPeriod) {
counter = 0;
ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor
.getCurrentNodeDetails();
HotNodeSummary nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress());
nodeSummary.setNodeConfiguration(NodeConfiguration.newBuilder()
.setWriteQueueCapacity(writeQueueCapacity)
.setSearchQueueCapacity(searchQueueCapacity)
.build());
return new ResourceFlowUnit<>(System.currentTimeMillis(), new ResourceContext(Resources.State.HEALTHY), nodeSummary);
}
else {
return new ResourceFlowUnit<>(System.currentTimeMillis());
}
}

// TODO: move this method back into the Rca base class
@Override
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
final List<FlowUnitMessage> flowUnitMessages =
args.getWireHopper().readFromWire(args.getNode());
List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName());
for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
}
setFlowUnits(flowUnitList);
}
}
5 changes: 5 additions & 0 deletions src/main/proto/inter_node_rpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,16 @@ message HotShardSummaryMessage {
double io_sys_callrate_threshold = 9;
int32 timePeriod = 10;
}
message NodeConfiguration {
int32 writeQueueCapacity = 1;
int32 searchQueueCapacity = 2;
}
message HotNodeSummaryMessage {
string nodeID = 1;
string hostAddress = 2;
HotResourceSummaryList hotResourceSummaryList = 3;
HotShardSummaryList hotShardSummaryList = 4;
NodeConfiguration nodeConfiguration = 5;
}
message HotClusterSummaryMessage {
int32 nodeCount = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public MetricTestHelper(long evaluationIntervalSeconds, String name) {
context = DSL.using(new MockConnection(Mock.of(0)));
}

public void createEmptyFlowunit() {
this.flowUnits = Collections.singletonList(MetricFlowUnit.generic());
}

public void createTestFlowUnits(final List<String> fieldName, final List<String> row) {
List<String[]> stringData = new ArrayList<>();
stringData.add(fieldName.toArray(new String[0]));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.store.rca.remediation;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.MetricTestHelper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation.NodeConfigurationRca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(GradleTaskForRca.class)
public class NodeConfigurationRcaTest {

private MetricTestHelper threadPool_QueueCapacity;
private NodeConfigurationRca nodeConfigurationRca;

@Before
public void init() throws Exception {
threadPool_QueueCapacity = new MetricTestHelper(5);
nodeConfigurationRca = new NodeConfigurationRca(1, threadPool_QueueCapacity);
ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper();
clusterDetailsEventProcessorTestHelper.addNodeDetails("node1", "127.0.0.0", false);
clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent();
}

/**
* generate flowunit and bind the flowunits it generate to metrics
*/
private void mockFlowUnits(int writeQueueCapacity, int searchQueueCapacity) {
threadPool_QueueCapacity.createTestFlowUnitsWithMultipleRows(
Arrays.asList(THREAD_POOL_TYPE.toString(), MetricsDB.MAX),
Arrays.asList(
Arrays.asList(ThreadPoolType.WRITE.toString(), String.valueOf(writeQueueCapacity)),
Arrays.asList(ThreadPoolType.SEARCH.toString(), String.valueOf(searchQueueCapacity))
)
);
}

@Test
public void testCapacityMetricNotExist() {
threadPool_QueueCapacity.createEmptyFlowunit();
ResourceFlowUnit<HotNodeSummary> flowUnit = nodeConfigurationRca.operate();
Assert.assertFalse(flowUnit.isEmpty());
NodeConfiguration nodeConfiguration = flowUnit.getSummary().getNodeConfiguration();
Assert.assertEquals(-1, nodeConfiguration.getSearchQueueCapacity());
Assert.assertEquals(-1, nodeConfiguration.getWriteQueueCapacity());
}

@Test
public void testCapacityCollection() {
mockFlowUnits(100, 200);
ResourceFlowUnit<HotNodeSummary> flowUnit = nodeConfigurationRca.operate();
Assert.assertFalse(flowUnit.isEmpty());
NodeConfiguration nodeConfiguration = flowUnit.getSummary().getNodeConfiguration();
Assert.assertEquals(200, nodeConfiguration.getSearchQueueCapacity());
Assert.assertEquals(100, nodeConfiguration.getWriteQueueCapacity());
}
}