From 026498d317577295d15e6229792f0967b520f26f Mon Sep 17 00:00:00 2001 From: Ruizhen Date: Thu, 18 Jun 2020 15:24:35 -0700 Subject: [PATCH 1/5] Add NodeConfigurationRca to collect threadpool config settings from each node --- .../api/metrics/ThreadPool_QueueCapacity.java | 10 ++ .../framework/api/persist/SQLParsingUtil.java | 2 +- .../api/summaries/HotNodeSummary.java | 17 ++++ .../rca/remediation/NodeConfigurationRca.java | 93 +++++++++++++++++++ src/main/proto/inter_node_rpc_service.proto | 5 + .../api/metrics/MetricTestHelper.java | 4 + .../remediation/NodeConfigurationRcaTest.java | 67 +++++++++++++ 7 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/ThreadPool_QueueCapacity.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigurationRca.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigurationRcaTest.java diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/ThreadPool_QueueCapacity.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/ThreadPool_QueueCapacity.java new file mode 100644 index 000000000..563136ca3 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/ThreadPool_QueueCapacity.java @@ -0,0 +1,10 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; + +public class ThreadPool_QueueCapacity extends Metric { + public ThreadPool_QueueCapacity() { + super(AllMetrics.ThreadPoolValue.THREADPOOL_QUEUE_CAPACITY.name(), 5); + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/persist/SQLParsingUtil.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/persist/SQLParsingUtil.java index ec828e2bd..26139f451 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/persist/SQLParsingUtil.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/persist/SQLParsingUtil.java @@ -73,7 +73,7 @@ public static double readDataFromSqlResult(Result result, Field double ret = Double.NaN; try { Record record = getRecordByName(result, matchedField, matchedFieldName); - ret = record.getValue(MetricsDB.MAX, Double.class); + ret = record.getValue(dataField, Double.class); } catch (IllegalArgumentException ie) { LOG.error("{} fails to match any row in field {}.", matchedFieldName, matchedField.getName()); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java index e670fc8fc..f5c5c36c1 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java @@ -17,6 +17,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotNodeSummaryMessage; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.JooqFieldValue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.google.gson.JsonElement; @@ -52,6 +53,7 @@ public class HotNodeSummary extends GenericSummary { private final String hostAddress; private List hotResourceSummaryList; private List hotShardSummaryList; + private NodeConfiguration nodeConfiguration; public HotNodeSummary(String nodeID, String hostAddress) { super(); @@ -59,6 +61,9 @@ public HotNodeSummary(String nodeID, String hostAddress) { this.hostAddress = hostAddress; this.hotResourceSummaryList = new ArrayList<>(); this.hotShardSummaryList = new ArrayList<>(); + this.nodeConfiguration = NodeConfiguration.newBuilder() + .setSearchQueueCapacity(-1) + .setWriteQueueCapacity(-1).build(); } public String getNodeID() { @@ -85,6 +90,14 @@ public void appendNestedSummary(HotShardSummary summary) { hotShardSummaryList.add(summary); } + public void setNodeConfiguration(NodeConfiguration nodeConfiguration) { + this.nodeConfiguration = nodeConfiguration; + } + + public NodeConfiguration getNodeConfiguration() { + return nodeConfiguration; + } + @Override public HotNodeSummaryMessage buildSummaryMessage() { final HotNodeSummaryMessage.Builder summaryMessageBuilder = HotNodeSummaryMessage.newBuilder(); @@ -98,6 +111,9 @@ public HotNodeSummaryMessage buildSummaryMessage() { summaryMessageBuilder.getHotShardSummaryListBuilder() .addHotShardSummary(hotShardSummary.buildSummaryMessage()); } + if (nodeConfiguration != null) { + summaryMessageBuilder.getNodeConfigurationBuilder().mergeFrom(nodeConfiguration); + } return summaryMessageBuilder.build(); } @@ -121,6 +137,7 @@ public static HotNodeSummary buildHotNodeSummaryFromMessage(HotNodeSummaryMessag message.getHotShardSummaryList().getHotShardSummary(i))); } } + newSummary.setNodeConfiguration(message.getNodeConfiguration()); return newSummary; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigurationRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigurationRca.java new file mode 100644 index 000000000..76f921be0 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigurationRca.java @@ -0,0 +1,93 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class NodeConfigurationRca extends Rca> { + + private static final Logger LOG = LogManager.getLogger(NodeConfigurationRca.class); + private final Metric threadPool_queueCapacity; + private final int rcaPeriod; + private int counter; + private int writeQueueCapacity; + private int searchQueueCapacity; + + public NodeConfigurationRca(int rcaPeriod, M threadPool_queueCapacity) { + super(5); + this.threadPool_queueCapacity = threadPool_queueCapacity; + this.rcaPeriod = rcaPeriod; + this.counter = 0; + this.writeQueueCapacity = -1; + this.searchQueueCapacity = -1; + } + + private void collectQueueCapacity(MetricFlowUnit flowUnit) { + double writeQueueCapacity = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(), + THREAD_POOL_TYPE.getField(), ThreadPoolType.WRITE.toString(), MetricsDB.MAX); + if (!Double.isNaN(writeQueueCapacity)) { + this.writeQueueCapacity = (int) writeQueueCapacity; + } + double searchQueueCapacity = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(), + THREAD_POOL_TYPE.getField(), ThreadPoolType.SEARCH.toString(), MetricsDB.MAX); + if (!Double.isNaN(searchQueueCapacity)) { + this.searchQueueCapacity = (int) searchQueueCapacity; + } + } + + @Override + public ResourceFlowUnit operate() { + counter += 1; + for (MetricFlowUnit flowUnit : threadPool_queueCapacity.getFlowUnits()) { + if (flowUnit.isEmpty()) { + continue; + } + collectQueueCapacity(flowUnit); + } + if (counter == rcaPeriod) { + counter = 0; + ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor + .getCurrentNodeDetails(); + HotNodeSummary nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress()); + nodeSummary.setNodeConfiguration(NodeConfiguration.newBuilder() + .setWriteQueueCapacity(writeQueueCapacity) + .setSearchQueueCapacity(searchQueueCapacity) + .build()); + return new ResourceFlowUnit<>(System.currentTimeMillis(), new ResourceContext(Resources.State.HEALTHY), nodeSummary); + } + else { + return new ResourceFlowUnit<>(System.currentTimeMillis()); + } + } + + // TODO: move this method back into the Rca base class + @Override + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + final List flowUnitMessages = + args.getWireHopper().readFromWire(args.getNode()); + List> flowUnitList = new ArrayList<>(); + LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName()); + for (FlowUnitMessage flowUnitMessage : flowUnitMessages) { + flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); + } + setFlowUnits(flowUnitList); + } +} diff --git a/src/main/proto/inter_node_rpc_service.proto b/src/main/proto/inter_node_rpc_service.proto index 2678ba2a7..381c29a68 100644 --- a/src/main/proto/inter_node_rpc_service.proto +++ b/src/main/proto/inter_node_rpc_service.proto @@ -126,11 +126,16 @@ message HotShardSummaryMessage { double io_sys_callrate_threshold = 9; int32 timePeriod = 10; } +message NodeConfiguration { + int32 writeQueueCapacity = 1; + int32 searchQueueCapacity = 2; +} message HotNodeSummaryMessage { string nodeID = 1; string hostAddress = 2; HotResourceSummaryList hotResourceSummaryList = 3; HotShardSummaryList hotShardSummaryList = 4; + NodeConfiguration nodeConfiguration = 5; } message HotClusterSummaryMessage { int32 nodeCount = 1; diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/MetricTestHelper.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/MetricTestHelper.java index d8fcddde3..b85235819 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/MetricTestHelper.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/MetricTestHelper.java @@ -42,6 +42,10 @@ public MetricTestHelper(long evaluationIntervalSeconds, String name) { context = DSL.using(new MockConnection(Mock.of(0))); } + public void createEmptyFlowunit() { + this.flowUnits = Collections.singletonList(MetricFlowUnit.generic()); + } + public void createTestFlowUnits(final List fieldName, final List row) { List stringData = new ArrayList<>(); stringData.add(fieldName.toArray(new String[0])); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigurationRcaTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigurationRcaTest.java new file mode 100644 index 000000000..30872e75c --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigurationRcaTest.java @@ -0,0 +1,67 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.store.rca.remediation; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.MetricTestHelper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation.NodeConfigurationRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; +import java.util.Arrays; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(GradleTaskForRca.class) +public class NodeConfigurationRcaTest { + + private MetricTestHelper threadPool_QueueCapacity; + private NodeConfigurationRca nodeConfigurationRca; + + @Before + public void init() throws Exception { + threadPool_QueueCapacity = new MetricTestHelper(5); + nodeConfigurationRca = new NodeConfigurationRca(1, threadPool_QueueCapacity); + ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper(); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node1", "127.0.0.0", false); + clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent(); + } + + /** + * generate flowunit and bind the flowunits it generate to metrics + */ + private void mockFlowUnits(int writeQueueCapacity, int searchQueueCapacity) { + threadPool_QueueCapacity.createTestFlowUnitsWithMultipleRows( + Arrays.asList(THREAD_POOL_TYPE.toString(), MetricsDB.MAX), + Arrays.asList( + Arrays.asList(ThreadPoolType.WRITE.toString(), String.valueOf(writeQueueCapacity)), + Arrays.asList(ThreadPoolType.SEARCH.toString(), String.valueOf(searchQueueCapacity)) + ) + ); + } + + @Test + public void testCapacityMetricNotExist() { + threadPool_QueueCapacity.createEmptyFlowunit(); + ResourceFlowUnit flowUnit = nodeConfigurationRca.operate(); + Assert.assertFalse(flowUnit.isEmpty()); + NodeConfiguration nodeConfiguration = flowUnit.getSummary().getNodeConfiguration(); + Assert.assertEquals(-1, nodeConfiguration.getSearchQueueCapacity()); + Assert.assertEquals(-1, nodeConfiguration.getWriteQueueCapacity()); + } + + @Test + public void testCapacityCollection() { + mockFlowUnits(100, 200); + ResourceFlowUnit flowUnit = nodeConfigurationRca.operate(); + Assert.assertFalse(flowUnit.isEmpty()); + NodeConfiguration nodeConfiguration = flowUnit.getSummary().getNodeConfiguration(); + Assert.assertEquals(200, nodeConfiguration.getSearchQueueCapacity()); + Assert.assertEquals(100, nodeConfiguration.getWriteQueueCapacity()); + } +} From 52bc51a25c562b1a5d574bd8e65508dd1f6665d3 Mon Sep 17 00:00:00 2001 From: Ruizhen Date: Mon, 29 Jun 2020 17:52:16 -0700 Subject: [PATCH 2/5] Create a separate node type EsConfigNode to collect ES config settings --- .../rca/framework/api/EsConfigNode.java | 78 +++++++++++++++++++ .../api/summaries/HotNodeSummary.java | 21 ++--- ...PerformanceControllerConfigCollector.java} | 42 ++++------ src/main/proto/inter_node_rpc_service.proto | 4 +- ...ormanceControllerConfigCollectorTest.java} | 26 +++---- 5 files changed, 121 insertions(+), 50 deletions(-) create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java rename src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/{NodeConfigurationRca.java => PerformanceControllerConfigCollector.java} (71%) rename src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/{NodeConfigurationRcaTest.java => PerformanceControllerConfigCollectorTest.java} (67%) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java new file mode 100644 index 000000000..8b47d9a72 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java @@ -0,0 +1,78 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * this is a base class for node(vertex) in RCA graph that reads configuration settings from ES. + */ +public abstract class EsConfigNode extends NonLeafNode> { + + private static final Logger LOG = LogManager.getLogger(EsConfigNode.class); + + public EsConfigNode() { + super(0, 5); + } + + /** + * fetch flowunits from local graph node + * + * @param args The wrapper around the flow unit operation. + */ + @Override + public void generateFlowUnitListFromLocal(FlowUnitOperationArgWrapper args) { + long startTime = System.currentTimeMillis(); + + ResourceFlowUnit result; + try { + result = this.operate(); + } catch (Exception ex) { + LOG.error("Exception in operate.", ex); + PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat( + ExceptionsAndErrors.EXCEPTION_IN_OPERATE, name(), 1); + result = new ResourceFlowUnit<>(System.currentTimeMillis()); + } + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat( + RcaGraphMetrics.GRAPH_NODE_OPERATE_CALL, this.name(), duration); + + setLocalFlowUnit(result); + } + + @Override + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + final List flowUnitMessages = + args.getWireHopper().readFromWire(args.getNode()); + List> flowUnitList = new ArrayList<>(); + LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName()); + for (FlowUnitMessage flowUnitMessage : flowUnitMessages) { + flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); + } + setFlowUnits(flowUnitList); + } + + @Override + public void handleNodeMuted() { + setLocalFlowUnit(new ResourceFlowUnit<>(System.currentTimeMillis())); + } + + /** + * EsConfig metrics are not intended to be persisted + * @param args FlowUnitOperationArgWrapper + */ + @Override + public void persistFlowUnit(FlowUnitOperationArgWrapper args) { + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java index f5c5c36c1..4b169babb 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java @@ -17,7 +17,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotNodeSummaryMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PerformanceControllerConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.JooqFieldValue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.google.gson.JsonElement; @@ -53,7 +53,7 @@ public class HotNodeSummary extends GenericSummary { private final String hostAddress; private List hotResourceSummaryList; private List hotShardSummaryList; - private NodeConfiguration nodeConfiguration; + private PerformanceControllerConfiguration performanceControllerConfiguration; public HotNodeSummary(String nodeID, String hostAddress) { super(); @@ -61,7 +61,7 @@ public HotNodeSummary(String nodeID, String hostAddress) { this.hostAddress = hostAddress; this.hotResourceSummaryList = new ArrayList<>(); this.hotShardSummaryList = new ArrayList<>(); - this.nodeConfiguration = NodeConfiguration.newBuilder() + this.performanceControllerConfiguration = PerformanceControllerConfiguration.newBuilder() .setSearchQueueCapacity(-1) .setWriteQueueCapacity(-1).build(); } @@ -90,12 +90,12 @@ public void appendNestedSummary(HotShardSummary summary) { hotShardSummaryList.add(summary); } - public void setNodeConfiguration(NodeConfiguration nodeConfiguration) { - this.nodeConfiguration = nodeConfiguration; + public void setPerformanceControllerConfiguration(PerformanceControllerConfiguration performanceControllerConfiguration) { + this.performanceControllerConfiguration = performanceControllerConfiguration; } - public NodeConfiguration getNodeConfiguration() { - return nodeConfiguration; + public PerformanceControllerConfiguration getPerformanceControllerConfiguration() { + return performanceControllerConfiguration; } @Override @@ -111,8 +111,9 @@ public HotNodeSummaryMessage buildSummaryMessage() { summaryMessageBuilder.getHotShardSummaryListBuilder() .addHotShardSummary(hotShardSummary.buildSummaryMessage()); } - if (nodeConfiguration != null) { - summaryMessageBuilder.getNodeConfigurationBuilder().mergeFrom(nodeConfiguration); + if (performanceControllerConfiguration != null) { + summaryMessageBuilder.getPerformanceControllerConfigurationBuilder().mergeFrom( + performanceControllerConfiguration); } return summaryMessageBuilder.build(); } @@ -137,7 +138,7 @@ public static HotNodeSummary buildHotNodeSummaryFromMessage(HotNodeSummaryMessag message.getHotShardSummaryList().getHotShardSummary(i))); } } - newSummary.setNodeConfiguration(message.getNodeConfiguration()); + newSummary.setPerformanceControllerConfiguration(message.getPerformanceControllerConfiguration()); return newSummary; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigurationRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/PerformanceControllerConfigCollector.java similarity index 71% rename from src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigurationRca.java rename to src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/PerformanceControllerConfigCollector.java index 76f921be0..59803f14d 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigurationRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/PerformanceControllerConfigCollector.java @@ -2,37 +2,36 @@ import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PerformanceControllerConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.EsConfigNode; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; -import java.util.ArrayList; -import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class NodeConfigurationRca extends Rca> { +/** + * This is a node level collector in RCA graph which collect the current config settings of PerformanceControllor + * PerformanceController is a ES plugin that helps with cache/queue auto tuning and this collector collect configs + * set by PerformanceController and pass them down to Decision Maker for the next round of resource auto-tuning. + */ +public class PerformanceControllerConfigCollector extends EsConfigNode { - private static final Logger LOG = LogManager.getLogger(NodeConfigurationRca.class); + private static final Logger LOG = LogManager.getLogger(PerformanceControllerConfigCollector.class); private final Metric threadPool_queueCapacity; private final int rcaPeriod; private int counter; private int writeQueueCapacity; private int searchQueueCapacity; - public NodeConfigurationRca(int rcaPeriod, M threadPool_queueCapacity) { - super(5); + public PerformanceControllerConfigCollector(int rcaPeriod, M threadPool_queueCapacity) { this.threadPool_queueCapacity = threadPool_queueCapacity; this.rcaPeriod = rcaPeriod; this.counter = 0; @@ -53,6 +52,12 @@ private void collectQueueCapacity(MetricFlowUnit flowUnit) { } } + /** + * collect config settings from the upstream metric flowunits and set them into the protobuf + * message PerformanceControllerConfiguration. This will allow us to serialize / de-serialize + * the config settings across grpc and send them to Decision Maker on elected master. + * @return ResourceFlowUnit with HotNodeSummary. And HotNodeSummary carries PerformanceControllerConfiguration + */ @Override public ResourceFlowUnit operate() { counter += 1; @@ -67,7 +72,7 @@ public ResourceFlowUnit operate() { ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor .getCurrentNodeDetails(); HotNodeSummary nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress()); - nodeSummary.setNodeConfiguration(NodeConfiguration.newBuilder() + nodeSummary.setPerformanceControllerConfiguration(PerformanceControllerConfiguration.newBuilder() .setWriteQueueCapacity(writeQueueCapacity) .setSearchQueueCapacity(searchQueueCapacity) .build()); @@ -77,17 +82,4 @@ public ResourceFlowUnit operate() { return new ResourceFlowUnit<>(System.currentTimeMillis()); } } - - // TODO: move this method back into the Rca base class - @Override - public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { - final List flowUnitMessages = - args.getWireHopper().readFromWire(args.getNode()); - List> flowUnitList = new ArrayList<>(); - LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName()); - for (FlowUnitMessage flowUnitMessage : flowUnitMessages) { - flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); - } - setFlowUnits(flowUnitList); - } } diff --git a/src/main/proto/inter_node_rpc_service.proto b/src/main/proto/inter_node_rpc_service.proto index 381c29a68..99b7062ea 100644 --- a/src/main/proto/inter_node_rpc_service.proto +++ b/src/main/proto/inter_node_rpc_service.proto @@ -126,7 +126,7 @@ message HotShardSummaryMessage { double io_sys_callrate_threshold = 9; int32 timePeriod = 10; } -message NodeConfiguration { +message PerformanceControllerConfiguration { int32 writeQueueCapacity = 1; int32 searchQueueCapacity = 2; } @@ -135,7 +135,7 @@ message HotNodeSummaryMessage { string hostAddress = 2; HotResourceSummaryList hotResourceSummaryList = 3; HotShardSummaryList hotShardSummaryList = 4; - NodeConfiguration nodeConfiguration = 5; + PerformanceControllerConfiguration performanceControllerConfiguration = 5; } message HotClusterSummaryMessage { int32 nodeCount = 1; diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigurationRcaTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/PerformanceControllerConfigCollectorTest.java similarity index 67% rename from src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigurationRcaTest.java rename to src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/PerformanceControllerConfigCollectorTest.java index 30872e75c..e518beb59 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigurationRcaTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/PerformanceControllerConfigCollectorTest.java @@ -2,14 +2,14 @@ import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.NodeConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PerformanceControllerConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.MetricTestHelper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation.NodeConfigurationRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation.PerformanceControllerConfigCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; import java.util.Arrays; import org.junit.Assert; @@ -18,15 +18,15 @@ import org.junit.experimental.categories.Category; @Category(GradleTaskForRca.class) -public class NodeConfigurationRcaTest { +public class PerformanceControllerConfigCollectorTest { private MetricTestHelper threadPool_QueueCapacity; - private NodeConfigurationRca nodeConfigurationRca; + private PerformanceControllerConfigCollector performanceControllerConfigCollector; @Before public void init() throws Exception { threadPool_QueueCapacity = new MetricTestHelper(5); - nodeConfigurationRca = new NodeConfigurationRca(1, threadPool_QueueCapacity); + performanceControllerConfigCollector = new PerformanceControllerConfigCollector(1, threadPool_QueueCapacity); ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper(); clusterDetailsEventProcessorTestHelper.addNodeDetails("node1", "127.0.0.0", false); clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent(); @@ -48,20 +48,20 @@ private void mockFlowUnits(int writeQueueCapacity, int searchQueueCapacity) { @Test public void testCapacityMetricNotExist() { threadPool_QueueCapacity.createEmptyFlowunit(); - ResourceFlowUnit flowUnit = nodeConfigurationRca.operate(); + ResourceFlowUnit flowUnit = performanceControllerConfigCollector.operate(); Assert.assertFalse(flowUnit.isEmpty()); - NodeConfiguration nodeConfiguration = flowUnit.getSummary().getNodeConfiguration(); - Assert.assertEquals(-1, nodeConfiguration.getSearchQueueCapacity()); - Assert.assertEquals(-1, nodeConfiguration.getWriteQueueCapacity()); + PerformanceControllerConfiguration performanceControllerConfiguration = flowUnit.getSummary().getPerformanceControllerConfiguration(); + Assert.assertEquals(-1, performanceControllerConfiguration.getSearchQueueCapacity()); + Assert.assertEquals(-1, performanceControllerConfiguration.getWriteQueueCapacity()); } @Test public void testCapacityCollection() { mockFlowUnits(100, 200); - ResourceFlowUnit flowUnit = nodeConfigurationRca.operate(); + ResourceFlowUnit flowUnit = performanceControllerConfigCollector.operate(); Assert.assertFalse(flowUnit.isEmpty()); - NodeConfiguration nodeConfiguration = flowUnit.getSummary().getNodeConfiguration(); - Assert.assertEquals(200, nodeConfiguration.getSearchQueueCapacity()); - Assert.assertEquals(100, nodeConfiguration.getWriteQueueCapacity()); + PerformanceControllerConfiguration performanceControllerConfiguration = flowUnit.getSummary().getPerformanceControllerConfiguration(); + Assert.assertEquals(200, performanceControllerConfiguration.getSearchQueueCapacity()); + Assert.assertEquals(100, performanceControllerConfiguration.getWriteQueueCapacity()); } } From b1eb04d53e292e7062b5eaad98f0b9063c2f883c Mon Sep 17 00:00:00 2001 From: Ruizhen Date: Tue, 14 Jul 2020 11:22:35 -0700 Subject: [PATCH 3/5] Add NodeConfigFlowunit --- .../rca/framework/api/EsConfigNode.java | 15 ++-- .../api/flow_units/NodeConfigFlowUnit.java | 71 +++++++++++++++++++ .../api/summaries/HotNodeSummary.java | 34 ++++----- .../framework/api/summaries/ResourceUtil.java | 6 ++ ...ollector.java => NodeConfigCollector.java} | 49 +++++++------ src/main/proto/inter_node_rpc_service.proto | 6 +- ...Test.java => NodeConfigCollectorTest.java} | 12 ++-- 7 files changed, 136 insertions(+), 57 deletions(-) create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/flow_units/NodeConfigFlowUnit.java rename src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/{PerformanceControllerConfigCollector.java => NodeConfigCollector.java} (71%) rename src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/{PerformanceControllerConfigCollectorTest.java => NodeConfigCollectorTest.java} (86%) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java index 8b47d9a72..94eff735d 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java @@ -2,8 +2,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.NodeConfigFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics; @@ -16,7 +15,7 @@ /** * this is a base class for node(vertex) in RCA graph that reads configuration settings from ES. */ -public abstract class EsConfigNode extends NonLeafNode> { +public abstract class EsConfigNode extends NonLeafNode { private static final Logger LOG = LogManager.getLogger(EsConfigNode.class); @@ -33,14 +32,14 @@ public EsConfigNode() { public void generateFlowUnitListFromLocal(FlowUnitOperationArgWrapper args) { long startTime = System.currentTimeMillis(); - ResourceFlowUnit result; + NodeConfigFlowUnit result; try { result = this.operate(); } catch (Exception ex) { LOG.error("Exception in operate.", ex); PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat( ExceptionsAndErrors.EXCEPTION_IN_OPERATE, name(), 1); - result = new ResourceFlowUnit<>(System.currentTimeMillis()); + result = new NodeConfigFlowUnit(System.currentTimeMillis()); } long endTime = System.currentTimeMillis(); long duration = endTime - startTime; @@ -55,17 +54,17 @@ public void generateFlowUnitListFromLocal(FlowUnitOperationArgWrapper args) { public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { final List flowUnitMessages = args.getWireHopper().readFromWire(args.getNode()); - List> flowUnitList = new ArrayList<>(); + List flowUnitList = new ArrayList<>(); LOG.debug("rca: Executing fromWire: {}", this.getClass().getSimpleName()); for (FlowUnitMessage flowUnitMessage : flowUnitMessages) { - flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); + flowUnitList.add(NodeConfigFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); } setFlowUnits(flowUnitList); } @Override public void handleNodeMuted() { - setLocalFlowUnit(new ResourceFlowUnit<>(System.currentTimeMillis())); + setLocalFlowUnit(new NodeConfigFlowUnit(System.currentTimeMillis())); } /** diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/flow_units/NodeConfigFlowUnit.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/flow_units/NodeConfigFlowUnit.java new file mode 100644 index 000000000..d98fbe9c1 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/flow_units/NodeConfigFlowUnit.java @@ -0,0 +1,71 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage.SummaryOneofCase; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; +import java.util.HashMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class NodeConfigFlowUnit extends ResourceFlowUnit { + + private static final Logger LOG = LogManager.getLogger(NodeConfigFlowUnit.class); + private final HashMap configMap; + + public NodeConfigFlowUnit(long timeStamp) { + super(timeStamp); + } + + public NodeConfigFlowUnit(long timeStamp, HotNodeSummary summary) { + super(timeStamp, new ResourceContext(Resources.State.HEALTHY), summary, false); + configMap = new HashMap<>(); + } + + public void addConfig(Resource resource, double value) { + HotResourceSummary configSummary = new HotResourceSummary(resource, Double.NaN, value, 0); + configMap.put(resource, configSummary); + } + + public double readConfig(Resource resource) { + HotResourceSummary configSummary = configMap.getOrDefault(resource, null); + if (configSummary == null) { + return Double.NaN; + } + return configSummary.getValue(); + } + + /** + * parse the "oneof" section in protocol buffer call the corresponding object build function for + * each summary type + */ + public static NodeConfigFlowUnit buildFlowUnitFromWrapper(final FlowUnitMessage message) { + //if the flowunit is empty. empty flowunit does not have context + if (message.hasResourceContext()) { + ResourceContext newContext = ResourceContext + .buildResourceContextFromMessage(message.getResourceContext()); + HotNodeSummary newSummary = null; + try { + if (message.getSummaryOneofCase() == SummaryOneofCase.HOTNODESUMMARY) { + newSummary = HotNodeSummary + .buildHotNodeSummaryFromMessage(message.getHotNodeSummary()); + } else { + throw new IllegalArgumentException(); + } + } catch (Exception e) { + // we are not supposed to run into this unless we specified wrong summary template + // for this function. Make sure the summary type passed in as template are consistent + // between serialization and de-serializing. + LOG.error("RCA: casting to wrong summary type when de-serializing this flowunit"); + } + return new NodeConfigFlowUnit(message.getTimeStamp(), newSummary); + } else { + //empty flowunit; + return new NodeConfigFlowUnit(message.getTimeStamp()); + } + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java index 4b169babb..a800463a2 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java @@ -17,7 +17,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotNodeSummaryMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PerformanceControllerConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.JooqFieldValue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.google.gson.JsonElement; @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import javax.annotation.Nullable; import org.apache.logging.log4j.LogManager; @@ -53,7 +54,7 @@ public class HotNodeSummary extends GenericSummary { private final String hostAddress; private List hotResourceSummaryList; private List hotShardSummaryList; - private PerformanceControllerConfiguration performanceControllerConfiguration; + private final HashMap resourceMap; public HotNodeSummary(String nodeID, String hostAddress) { super(); @@ -61,9 +62,7 @@ public HotNodeSummary(String nodeID, String hostAddress) { this.hostAddress = hostAddress; this.hotResourceSummaryList = new ArrayList<>(); this.hotShardSummaryList = new ArrayList<>(); - this.performanceControllerConfiguration = PerformanceControllerConfiguration.newBuilder() - .setSearchQueueCapacity(-1) - .setWriteQueueCapacity(-1).build(); + this.resourceMap = new HashMap<>(); } public String getNodeID() { @@ -83,19 +82,25 @@ public List getHotShardSummaryList() { } public void appendNestedSummary(HotResourceSummary summary) { - hotResourceSummaryList.add(summary); + if (summary != null) { + hotResourceSummaryList.add(summary); + resourceMap.put(summary.getResource(), summary); + } } public void appendNestedSummary(HotShardSummary summary) { hotShardSummaryList.add(summary); } - public void setPerformanceControllerConfiguration(PerformanceControllerConfiguration performanceControllerConfiguration) { - this.performanceControllerConfiguration = performanceControllerConfiguration; - } - - public PerformanceControllerConfiguration getPerformanceControllerConfiguration() { - return performanceControllerConfiguration; + /** + * read the HotResourceSummary that is tied to the given resource type + * from the resource summary list + * @param resource Resource type + * @return the resource summary object that is tied to the given resource type + */ + @Nullable + public HotResourceSummary getResourceSummary(Resource resource) { + return resourceMap.get(resource); } @Override @@ -111,10 +116,6 @@ public HotNodeSummaryMessage buildSummaryMessage() { summaryMessageBuilder.getHotShardSummaryListBuilder() .addHotShardSummary(hotShardSummary.buildSummaryMessage()); } - if (performanceControllerConfiguration != null) { - summaryMessageBuilder.getPerformanceControllerConfigurationBuilder().mergeFrom( - performanceControllerConfiguration); - } return summaryMessageBuilder.build(); } @@ -138,7 +139,6 @@ public static HotNodeSummary buildHotNodeSummaryFromMessage(HotNodeSummaryMessag message.getHotShardSummaryList().getHotShardSummary(i))); } } - newSummary.setPerformanceControllerConfiguration(message.getPerformanceControllerConfiguration()); return newSummary; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java index 259f835a6..1be234b5c 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java @@ -58,6 +58,12 @@ public class ResourceUtil { public static final Resource SEARCH_QUEUE_REJECTION = Resource.newBuilder() .setResourceEnum(ResourceEnum.SEARCH_THREADPOOL) .setMetricEnum(MetricEnum.QUEUE_REJECTION).build(); + public static final Resource WRITE_QUEUE_CAPACITY = Resource.newBuilder() + .setResourceEnum(ResourceEnum.WRITE_THREADPOOL) + .setMetricEnum(MetricEnum.QUEUE_CAPACITY).build(); + public static final Resource SEARCH_QUEUE_CAPACITY = Resource.newBuilder() + .setResourceEnum(ResourceEnum.SEARCH_THREADPOOL) + .setMetricEnum(MetricEnum.QUEUE_CAPACITY).build(); /** * Read the resourceType name from the ResourceType object diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/PerformanceControllerConfigCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigCollector.java similarity index 71% rename from src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/PerformanceControllerConfigCollector.java rename to src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigCollector.java index 59803f14d..90b5324c1 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/PerformanceControllerConfigCollector.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigCollector.java @@ -2,53 +2,63 @@ import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PerformanceControllerConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.EsConfigNode; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.NodeConfigFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; +import java.util.HashMap; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jooq.Record; +import org.jooq.Result; /** * This is a node level collector in RCA graph which collect the current config settings of PerformanceControllor * PerformanceController is a ES plugin that helps with cache/queue auto tuning and this collector collect configs * set by PerformanceController and pass them down to Decision Maker for the next round of resource auto-tuning. */ -public class PerformanceControllerConfigCollector extends EsConfigNode { +public class NodeConfigCollector extends EsConfigNode { - private static final Logger LOG = LogManager.getLogger(PerformanceControllerConfigCollector.class); - private final Metric threadPool_queueCapacity; + private static final Logger LOG = LogManager.getLogger(NodeConfigCollector.class); + private final ThreadPool_QueueCapacity threadPool_queueCapacity; private final int rcaPeriod; private int counter; - private int writeQueueCapacity; - private int searchQueueCapacity; + private final HashMap configResult; - public PerformanceControllerConfigCollector(int rcaPeriod, M threadPool_queueCapacity) { + public NodeConfigCollector(int rcaPeriod, ThreadPool_QueueCapacity threadPool_queueCapacity) { this.threadPool_queueCapacity = threadPool_queueCapacity; this.rcaPeriod = rcaPeriod; this.counter = 0; - this.writeQueueCapacity = -1; - this.searchQueueCapacity = -1; + this.configResult = new HashMap<>(); } private void collectQueueCapacity(MetricFlowUnit flowUnit) { double writeQueueCapacity = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(), THREAD_POOL_TYPE.getField(), ThreadPoolType.WRITE.toString(), MetricsDB.MAX); if (!Double.isNaN(writeQueueCapacity)) { - this.writeQueueCapacity = (int) writeQueueCapacity; + configResult.put(ResourceUtil.WRITE_QUEUE_CAPACITY, writeQueueCapacity); + } + else { + LOG.error("write queue capacity is NaN"); } double searchQueueCapacity = SQLParsingUtil.readDataFromSqlResult(flowUnit.getData(), THREAD_POOL_TYPE.getField(), ThreadPoolType.SEARCH.toString(), MetricsDB.MAX); if (!Double.isNaN(searchQueueCapacity)) { - this.searchQueueCapacity = (int) searchQueueCapacity; + configResult.put(ResourceUtil.SEARCH_QUEUE_CAPACITY, searchQueueCapacity); + } + else { + LOG.error("search queue capacity is NaN"); } } @@ -59,7 +69,7 @@ private void collectQueueCapacity(MetricFlowUnit flowUnit) { * @return ResourceFlowUnit with HotNodeSummary. And HotNodeSummary carries PerformanceControllerConfiguration */ @Override - public ResourceFlowUnit operate() { + public NodeConfigFlowUnit operate() { counter += 1; for (MetricFlowUnit flowUnit : threadPool_queueCapacity.getFlowUnits()) { if (flowUnit.isEmpty()) { @@ -72,14 +82,11 @@ public ResourceFlowUnit operate() { ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor .getCurrentNodeDetails(); HotNodeSummary nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress()); - nodeSummary.setPerformanceControllerConfiguration(PerformanceControllerConfiguration.newBuilder() - .setWriteQueueCapacity(writeQueueCapacity) - .setSearchQueueCapacity(searchQueueCapacity) - .build()); - return new ResourceFlowUnit<>(System.currentTimeMillis(), new ResourceContext(Resources.State.HEALTHY), nodeSummary); + configResult.values().forEach(nodeSummary::appendNestedSummary); + return new NodeConfigFlowUnit(System.currentTimeMillis(), nodeSummary); } else { - return new ResourceFlowUnit<>(System.currentTimeMillis()); + return new NodeConfigFlowUnit(System.currentTimeMillis()); } } } diff --git a/src/main/proto/inter_node_rpc_service.proto b/src/main/proto/inter_node_rpc_service.proto index b4c47c241..c1e05e8ea 100644 --- a/src/main/proto/inter_node_rpc_service.proto +++ b/src/main/proto/inter_node_rpc_service.proto @@ -84,6 +84,7 @@ enum MetricEnum { // threadpool QUEUE_REJECTION = 6 [(additional_fields).name = "queue rejection", (additional_fields).description = "rejection period in second"]; + QUEUE_CAPACITY = 7 [(additional_fields).name = "queue capacity", (additional_fields).description = "max capacity of the queue"]; } /* @@ -136,16 +137,11 @@ message HotShardSummaryMessage { double io_sys_callrate_threshold = 9; int32 timePeriod = 10; } -message PerformanceControllerConfiguration { - int32 writeQueueCapacity = 1; - int32 searchQueueCapacity = 2; -} message HotNodeSummaryMessage { string nodeID = 1; string hostAddress = 2; HotResourceSummaryList hotResourceSummaryList = 3; HotShardSummaryList hotShardSummaryList = 4; - PerformanceControllerConfiguration performanceControllerConfiguration = 5; } message HotClusterSummaryMessage { int32 nodeCount = 1; diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/PerformanceControllerConfigCollectorTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigCollectorTest.java similarity index 86% rename from src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/PerformanceControllerConfigCollectorTest.java rename to src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigCollectorTest.java index e518beb59..d43fb1fd5 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/PerformanceControllerConfigCollectorTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigCollectorTest.java @@ -9,7 +9,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.MetricTestHelper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation.PerformanceControllerConfigCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation.NodeConfigCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; import java.util.Arrays; import org.junit.Assert; @@ -18,15 +18,15 @@ import org.junit.experimental.categories.Category; @Category(GradleTaskForRca.class) -public class PerformanceControllerConfigCollectorTest { +public class NodeConfigCollectorTest { private MetricTestHelper threadPool_QueueCapacity; - private PerformanceControllerConfigCollector performanceControllerConfigCollector; + private NodeConfigCollector nodeConfigCollector; @Before public void init() throws Exception { threadPool_QueueCapacity = new MetricTestHelper(5); - performanceControllerConfigCollector = new PerformanceControllerConfigCollector(1, threadPool_QueueCapacity); + nodeConfigCollector = new NodeConfigCollector(1, threadPool_QueueCapacity); ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper(); clusterDetailsEventProcessorTestHelper.addNodeDetails("node1", "127.0.0.0", false); clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent(); @@ -48,7 +48,7 @@ private void mockFlowUnits(int writeQueueCapacity, int searchQueueCapacity) { @Test public void testCapacityMetricNotExist() { threadPool_QueueCapacity.createEmptyFlowunit(); - ResourceFlowUnit flowUnit = performanceControllerConfigCollector.operate(); + ResourceFlowUnit flowUnit = nodeConfigCollector.operate(); Assert.assertFalse(flowUnit.isEmpty()); PerformanceControllerConfiguration performanceControllerConfiguration = flowUnit.getSummary().getPerformanceControllerConfiguration(); Assert.assertEquals(-1, performanceControllerConfiguration.getSearchQueueCapacity()); @@ -58,7 +58,7 @@ public void testCapacityMetricNotExist() { @Test public void testCapacityCollection() { mockFlowUnits(100, 200); - ResourceFlowUnit flowUnit = performanceControllerConfigCollector.operate(); + ResourceFlowUnit flowUnit = nodeConfigCollector.operate(); Assert.assertFalse(flowUnit.isEmpty()); PerformanceControllerConfiguration performanceControllerConfiguration = flowUnit.getSummary().getPerformanceControllerConfiguration(); Assert.assertEquals(200, performanceControllerConfiguration.getSearchQueueCapacity()); From 6505ff79fabbfec9163474908e654faedadf1714 Mon Sep 17 00:00:00 2001 From: Ruizhen Date: Fri, 17 Jul 2020 16:28:54 -0700 Subject: [PATCH 4/5] Fix bugs and add UTs --- .../rca/framework/api/EsConfigNode.java | 2 +- .../api/flow_units/NodeConfigFlowUnit.java | 101 +++++++++++++----- .../api/metrics/ThreadPool_QueueCapacity.java | 17 ++- .../rca/store/rca/cluster/NodeKey.java | 5 + .../rca/remediation/NodeConfigCollector.java | 39 ++++--- .../api/metrics/MetricTestHelper.java | 4 - .../flow_units/MetricFlowUnitTestHelper.java | 40 +++++++ .../remediation/NodeConfigCollectorTest.java | 61 ++++++----- 8 files changed, 192 insertions(+), 77 deletions(-) create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/flow_units/MetricFlowUnitTestHelper.java diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java index 94eff735d..067e902bf 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/EsConfigNode.java @@ -31,7 +31,6 @@ public EsConfigNode() { @Override public void generateFlowUnitListFromLocal(FlowUnitOperationArgWrapper args) { long startTime = System.currentTimeMillis(); - NodeConfigFlowUnit result; try { result = this.operate(); @@ -73,5 +72,6 @@ public void handleNodeMuted() { */ @Override public void persistFlowUnit(FlowUnitOperationArgWrapper args) { + assert true; } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/flow_units/NodeConfigFlowUnit.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/flow_units/NodeConfigFlowUnit.java index d98fbe9c1..c1f392e76 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/flow_units/NodeConfigFlowUnit.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/flow_units/NodeConfigFlowUnit.java @@ -1,36 +1,81 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage.SummaryOneofCase; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotNodeSummaryMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; import java.util.HashMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +/** + * a flowunit type to carry ES node configurations (queue/cache capacities, etc.) + */ public class NodeConfigFlowUnit extends ResourceFlowUnit { - private static final Logger LOG = LogManager.getLogger(NodeConfigFlowUnit.class); private final HashMap configMap; public NodeConfigFlowUnit(long timeStamp) { super(timeStamp); + this.configMap = new HashMap<>(); } - public NodeConfigFlowUnit(long timeStamp, HotNodeSummary summary) { - super(timeStamp, new ResourceContext(Resources.State.HEALTHY), summary, false); - configMap = new HashMap<>(); + public NodeConfigFlowUnit(long timeStamp, NodeKey nodeKey) { + super(timeStamp, new ResourceContext(Resources.State.HEALTHY), null, false); + this.setSummary(new HotNodeSummary(nodeKey.getNodeId(), nodeKey.getHostAddress())); + this.configMap = new HashMap<>(); } + /** + * Add new config setting to flowunit + * @param resource config setting type + * @param value config setting value + */ public void addConfig(Resource resource, double value) { HotResourceSummary configSummary = new HotResourceSummary(resource, Double.NaN, value, 0); configMap.put(resource, configSummary); } + /** + * Add new config setting to flowunit + * @param configSummary config setting summary object + */ + public void addConfig(HotResourceSummary configSummary) { + configMap.put(configSummary.getResource(), configSummary); + } + + /** + * check if the config setting exist in flowunit + * @param resource config setting type + * @return if config exist + */ + public boolean hasConfig(Resource resource) { + return configMap.containsKey(resource); + } + + /** + * read the config value of the config setting from flowunit + * @param resource config setting type + * @return config setting value + */ public double readConfig(Resource resource) { HotResourceSummary configSummary = configMap.getOrDefault(resource, null); if (configSummary == null) { @@ -39,33 +84,33 @@ public double readConfig(Resource resource) { return configSummary.getValue(); } + @Override + public boolean isEmpty() { + return configMap.isEmpty(); + } + /** - * parse the "oneof" section in protocol buffer call the corresponding object build function for - * each summary type + * build NodeConfigFlowUnit from the protobuf message */ public static NodeConfigFlowUnit buildFlowUnitFromWrapper(final FlowUnitMessage message) { - //if the flowunit is empty. empty flowunit does not have context - if (message.hasResourceContext()) { - ResourceContext newContext = ResourceContext - .buildResourceContextFromMessage(message.getResourceContext()); - HotNodeSummary newSummary = null; - try { - if (message.getSummaryOneofCase() == SummaryOneofCase.HOTNODESUMMARY) { - newSummary = HotNodeSummary - .buildHotNodeSummaryFromMessage(message.getHotNodeSummary()); - } else { - throw new IllegalArgumentException(); + NodeConfigFlowUnit nodeConfigFlowUnit; + if (message.getSummaryOneofCase() == SummaryOneofCase.HOTNODESUMMARY) { + HotNodeSummaryMessage nodeSummaryMessage = message.getHotNodeSummary(); + NodeKey nodeKey = new NodeKey(nodeSummaryMessage.getNodeID(), + nodeSummaryMessage.getHostAddress()); + nodeConfigFlowUnit = new NodeConfigFlowUnit(message.getTimeStamp(), nodeKey); + if (nodeSummaryMessage.hasHotResourceSummaryList()) { + for (int i = 0; + i < nodeSummaryMessage.getHotResourceSummaryList().getHotResourceSummaryCount(); i++) { + nodeConfigFlowUnit.addConfig( + HotResourceSummary.buildHotResourceSummaryFromMessage( + nodeSummaryMessage.getHotResourceSummaryList().getHotResourceSummary(i)) + ); } - } catch (Exception e) { - // we are not supposed to run into this unless we specified wrong summary template - // for this function. Make sure the summary type passed in as template are consistent - // between serialization and de-serializing. - LOG.error("RCA: casting to wrong summary type when de-serializing this flowunit"); } - return new NodeConfigFlowUnit(message.getTimeStamp(), newSummary); } else { - //empty flowunit; - return new NodeConfigFlowUnit(message.getTimeStamp()); + nodeConfigFlowUnit = new NodeConfigFlowUnit(message.getTimeStamp()); } + return nodeConfigFlowUnit; } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/ThreadPool_QueueCapacity.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/ThreadPool_QueueCapacity.java index 563136ca3..d4781e3eb 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/ThreadPool_QueueCapacity.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/ThreadPool_QueueCapacity.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; @@ -5,6 +20,6 @@ public class ThreadPool_QueueCapacity extends Metric { public ThreadPool_QueueCapacity() { - super(AllMetrics.ThreadPoolValue.THREADPOOL_QUEUE_CAPACITY.name(), 5); + super(AllMetrics.ThreadPoolValue.THREADPOOL_QUEUE_CAPACITY.toString(), 5); } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/NodeKey.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/NodeKey.java index bb37997a4..4725297e6 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/NodeKey.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/NodeKey.java @@ -15,6 +15,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails; import org.apache.commons.lang3.builder.HashCodeBuilder; public class NodeKey { @@ -26,6 +27,10 @@ public NodeKey(String nodeId, String hostAddress) { this.hostAddress = hostAddress; } + public NodeKey(InstanceDetails instanceDetails) { + this(instanceDetails.getInstanceId(), instanceDetails.getInstanceIp()); + } + public String getNodeId() { return nodeId; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigCollector.java index 90b5324c1..a05f0e4ce 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigCollector.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/remediation/NodeConfigCollector.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation; import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; @@ -6,27 +21,19 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.EsConfigNode; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.NodeConfigFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.NodeConfigFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; import java.util.HashMap; -import java.util.function.Function; -import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.jooq.Record; -import org.jooq.Result; /** - * This is a node level collector in RCA graph which collect the current config settings of PerformanceControllor - * PerformanceController is a ES plugin that helps with cache/queue auto tuning and this collector collect configs - * set by PerformanceController and pass them down to Decision Maker for the next round of resource auto-tuning. + * This is a node level collector in RCA graph which collect the current config settings from ES (queue/cache capacity etc.) + * And pass them down to Decision Maker for the next round of resource auto-tuning. */ public class NodeConfigCollector extends EsConfigNode { @@ -79,11 +86,9 @@ public NodeConfigFlowUnit operate() { } if (counter == rcaPeriod) { counter = 0; - ClusterDetailsEventProcessor.NodeDetails currentNode = ClusterDetailsEventProcessor - .getCurrentNodeDetails(); - HotNodeSummary nodeSummary = new HotNodeSummary(currentNode.getId(), currentNode.getHostAddress()); - configResult.values().forEach(nodeSummary::appendNestedSummary); - return new NodeConfigFlowUnit(System.currentTimeMillis(), nodeSummary); + NodeConfigFlowUnit flowUnits = new NodeConfigFlowUnit(System.currentTimeMillis(), new NodeKey(getInstanceDetails())); + configResult.forEach(flowUnits::addConfig); + return flowUnits; } else { return new NodeConfigFlowUnit(System.currentTimeMillis()); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/MetricTestHelper.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/MetricTestHelper.java index b85235819..d8fcddde3 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/MetricTestHelper.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/metrics/MetricTestHelper.java @@ -42,10 +42,6 @@ public MetricTestHelper(long evaluationIntervalSeconds, String name) { context = DSL.using(new MockConnection(Mock.of(0))); } - public void createEmptyFlowunit() { - this.flowUnits = Collections.singletonList(MetricFlowUnit.generic()); - } - public void createTestFlowUnits(final List fieldName, final List row) { List stringData = new ArrayList<>(); stringData.add(fieldName.toArray(new String[0])); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/flow_units/MetricFlowUnitTestHelper.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/flow_units/MetricFlowUnitTestHelper.java new file mode 100644 index 000000000..698682e52 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/flow_units/MetricFlowUnitTestHelper.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.flow_units; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; +import java.util.ArrayList; +import java.util.List; +import org.jooq.DSLContext; +import org.jooq.Record; +import org.jooq.Result; +import org.jooq.impl.DSL; +import org.jooq.tools.jdbc.Mock; +import org.jooq.tools.jdbc.MockConnection; + +public class MetricFlowUnitTestHelper { + + public static MetricFlowUnit createFlowUnit(final List fieldName, final List... rows) { + DSLContext context = DSL.using(new MockConnection(Mock.of(0))); + List stringData = new ArrayList<>(); + stringData.add(fieldName.toArray(new String[0])); + for (int i = 0; i < rows.length; i++) { + stringData.add(rows[i].toArray(new String[0])); + } + Result newRecordList = context.fetchFromStringData(stringData); + return new MetricFlowUnit(0, newRecordList); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigCollectorTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigCollectorTest.java index d43fb1fd5..c8c520a9c 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigCollectorTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/remediation/NodeConfigCollectorTest.java @@ -2,16 +2,20 @@ import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PerformanceControllerConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.MetricTestHelper; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.NodeConfigFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.flow_units.MetricFlowUnitTestHelper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation.NodeConfigCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; import java.util.Arrays; +import java.util.Collections; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -20,48 +24,53 @@ @Category(GradleTaskForRca.class) public class NodeConfigCollectorTest { - private MetricTestHelper threadPool_QueueCapacity; + private ThreadPool_QueueCapacity threadPool_QueueCapacity; private NodeConfigCollector nodeConfigCollector; @Before - public void init() throws Exception { - threadPool_QueueCapacity = new MetricTestHelper(5); + public void init() { + threadPool_QueueCapacity = new ThreadPool_QueueCapacity(); nodeConfigCollector = new NodeConfigCollector(1, threadPool_QueueCapacity); - ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper(); - clusterDetailsEventProcessorTestHelper.addNodeDetails("node1", "127.0.0.0", false); - clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent(); + + ClusterDetailsEventProcessor clusterDetailsEventProcessor = new ClusterDetailsEventProcessor(); + ClusterDetailsEventProcessor.NodeDetails node1 = + new ClusterDetailsEventProcessor.NodeDetails(AllMetrics.NodeRole.DATA, "node1", "127.0.0.0", false); + clusterDetailsEventProcessor.setNodesDetails(Collections.singletonList(node1)); + AppContext appContext = new AppContext(); + appContext.setClusterDetailsEventProcessor(clusterDetailsEventProcessor); + nodeConfigCollector.setAppContext(appContext); } /** * generate flowunit and bind the flowunits it generate to metrics */ + @SuppressWarnings("unchecked") private void mockFlowUnits(int writeQueueCapacity, int searchQueueCapacity) { - threadPool_QueueCapacity.createTestFlowUnitsWithMultipleRows( + MetricFlowUnit flowUnit = MetricFlowUnitTestHelper.createFlowUnit( Arrays.asList(THREAD_POOL_TYPE.toString(), MetricsDB.MAX), - Arrays.asList( - Arrays.asList(ThreadPoolType.WRITE.toString(), String.valueOf(writeQueueCapacity)), - Arrays.asList(ThreadPoolType.SEARCH.toString(), String.valueOf(searchQueueCapacity)) - ) + Arrays.asList(ThreadPoolType.WRITE.toString(), String.valueOf(writeQueueCapacity)), + Arrays.asList(ThreadPoolType.SEARCH.toString(), String.valueOf(searchQueueCapacity)) ); + threadPool_QueueCapacity.setLocalFlowUnit(flowUnit); } @Test public void testCapacityMetricNotExist() { - threadPool_QueueCapacity.createEmptyFlowunit(); - ResourceFlowUnit flowUnit = nodeConfigCollector.operate(); - Assert.assertFalse(flowUnit.isEmpty()); - PerformanceControllerConfiguration performanceControllerConfiguration = flowUnit.getSummary().getPerformanceControllerConfiguration(); - Assert.assertEquals(-1, performanceControllerConfiguration.getSearchQueueCapacity()); - Assert.assertEquals(-1, performanceControllerConfiguration.getWriteQueueCapacity()); + threadPool_QueueCapacity.setLocalFlowUnit(MetricFlowUnit.generic()); + NodeConfigFlowUnit flowUnit = nodeConfigCollector.operate(); + Assert.assertTrue(flowUnit.isEmpty()); + Assert.assertFalse(flowUnit.hasConfig(ResourceUtil.SEARCH_QUEUE_CAPACITY)); + Assert.assertFalse(flowUnit.hasConfig(ResourceUtil.WRITE_QUEUE_CAPACITY)); } @Test public void testCapacityCollection() { mockFlowUnits(100, 200); - ResourceFlowUnit flowUnit = nodeConfigCollector.operate(); + NodeConfigFlowUnit flowUnit = nodeConfigCollector.operate(); Assert.assertFalse(flowUnit.isEmpty()); - PerformanceControllerConfiguration performanceControllerConfiguration = flowUnit.getSummary().getPerformanceControllerConfiguration(); - Assert.assertEquals(200, performanceControllerConfiguration.getSearchQueueCapacity()); - Assert.assertEquals(100, performanceControllerConfiguration.getWriteQueueCapacity()); + Assert.assertTrue(flowUnit.hasConfig(ResourceUtil.SEARCH_QUEUE_CAPACITY)); + Assert.assertEquals(200, flowUnit.readConfig(ResourceUtil.SEARCH_QUEUE_CAPACITY), 0.01); + Assert.assertTrue(flowUnit.hasConfig(ResourceUtil.WRITE_QUEUE_CAPACITY)); + Assert.assertEquals(100, flowUnit.readConfig(ResourceUtil.WRITE_QUEUE_CAPACITY), 0.01); } } From 97238e9b2a76c53df8f0b9b44a608006abac0f45 Mon Sep 17 00:00:00 2001 From: Ruizhen Date: Sun, 19 Jul 2020 12:01:13 -0700 Subject: [PATCH 5/5] remove unnecessary code --- .../framework/api/summaries/HotNodeSummary.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java index a800463a2..901775851 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummary.java @@ -17,7 +17,6 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotNodeSummaryMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.JooqFieldValue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.google.gson.JsonElement; @@ -25,7 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import javax.annotation.Nullable; import org.apache.logging.log4j.LogManager; @@ -54,7 +52,6 @@ public class HotNodeSummary extends GenericSummary { private final String hostAddress; private List hotResourceSummaryList; private List hotShardSummaryList; - private final HashMap resourceMap; public HotNodeSummary(String nodeID, String hostAddress) { super(); @@ -62,7 +59,6 @@ public HotNodeSummary(String nodeID, String hostAddress) { this.hostAddress = hostAddress; this.hotResourceSummaryList = new ArrayList<>(); this.hotShardSummaryList = new ArrayList<>(); - this.resourceMap = new HashMap<>(); } public String getNodeID() { @@ -84,7 +80,6 @@ public List getHotShardSummaryList() { public void appendNestedSummary(HotResourceSummary summary) { if (summary != null) { hotResourceSummaryList.add(summary); - resourceMap.put(summary.getResource(), summary); } } @@ -92,17 +87,6 @@ public void appendNestedSummary(HotShardSummary summary) { hotShardSummaryList.add(summary); } - /** - * read the HotResourceSummary that is tied to the given resource type - * from the resource summary list - * @param resource Resource type - * @return the resource summary object that is tied to the given resource type - */ - @Nullable - public HotResourceSummary getResourceSummary(Resource resource) { - return resourceMap.get(resource); - } - @Override public HotNodeSummaryMessage buildSummaryMessage() { final HotNodeSummaryMessage.Builder summaryMessageBuilder = HotNodeSummaryMessage.newBuilder();