From b6f1a51eb78abad87b3bbd4a6be27c05284c4ff4 Mon Sep 17 00:00:00 2001 From: khushbr Date: Wed, 1 Jul 2020 14:17:01 -0700 Subject: [PATCH] Including the Protobuf re-factoring changes --- INSTALL.md | 12 + build.gradle | 5 + pa_bin/performance-analyzer-agent | 4 +- .../api/summaries/HotResourceSummary.java | 52 ++-- .../api/summaries/ResourceTypeUtil.java | 120 ------- .../framework/api/summaries/ResourceUtil.java | 95 ++++++ .../rca/framework/core/GenericContext.java | 4 + .../store/rca/HighHeapUsageClusterRca.java | 7 +- .../rca/store/rca/HotNodeClusterRca.java | 13 +- .../rca/store/rca/cluster/BaseClusterRca.java | 237 ++++++++++++++ .../rca/store/rca/cluster/NodeKey.java | 62 ++++ .../rca/cluster/QueueRejectionClusterRca.java | 28 ++ .../rca/hot_node/GenericResourceRca.java | 12 +- .../rca/store/rca/hot_node/HighCpuRca.java | 6 +- .../rca/hotheap/HighHeapUsageOldGenRca.java | 7 +- .../rca/hotheap/HighHeapUsageYoungGenRca.java | 9 +- .../store/rca/hotheap/NodeStatAggregator.java | 8 +- .../rca/hotshard/HotShardClusterRca.java | 16 +- .../rca/store/rca/hotshard/HotShardRca.java | 8 +- .../rca/threadpool/QueueRejectionRca.java | 16 +- src/main/proto/inter_node_rpc_service.proto | 54 ++-- .../rca/framework/api/RcaTestHelper.java | 51 ++- .../api/summaries/HotClusterSummaryTest.java | 1 - .../api/summaries/HotNodeSummaryTest.java | 6 +- .../api/summaries/HotResourceSummaryTest.java | 68 ++-- .../api/summaries/HotShardSummaryTest.java | 2 - .../api/summaries/ResourceTypeUtilTest.java | 71 ----- .../api/summaries/ResourceUtilTest.java | 30 ++ .../PersistFlowUnitAndSummaryTest.java | 5 +- .../rca/persistence/SQLitePersistorTest.java | 5 +- ...lusterDetailsEventProcessorTestHelper.java | 5 + .../rca/HighHeapUsageClusterRcaTest.java | 27 +- .../store/rca/HotNodeClusterRcaTest.java | 32 +- .../store/rca/cluster/BaseClusterRcaTest.java | 294 ++++++++++++++++++ .../rca/hotshard/HotShardClusterRcaTest.java | 49 ++- .../rca/threadpool/QueueRejectionRcaTest.java | 8 +- 36 files changed, 1014 insertions(+), 415 deletions(-) delete mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceTypeUtil.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/BaseClusterRca.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/NodeKey.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/QueueRejectionClusterRca.java delete mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceTypeUtilTest.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtilTest.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/cluster/BaseClusterRcaTest.java diff --git a/INSTALL.md b/INSTALL.md index 754d183ae..3ffb60b44 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -33,6 +33,18 @@ This document walks you through the process of building and deploying the RCA fr a. Launch IntelliJ IDEA b. Choose Import Project and select the `build.gradle` file in the root of this package + + 5. (Optional) TLS Setup + + a. Open pa_config/performance-analyzer.properties + + b. Modify the certificate-file-path, private-key-file-path, and https-enabled entries + + c. Example performance-analyzer.properties: + + certificate-file-path = /etc/ssl/certs/example.com.crt + private-key-file-path = /home/myUser/.ssh/id_rsa + https-enabled = true ### Build RCA framework This package uses the [Gradle](https://docs.gradle.org/current/userguide/userguide.html) build system. Gradle comes with excellent documentation that should be your first stop when trying to figure out how to operate or modify the build. diff --git a/build.gradle b/build.gradle index 00ffef6a1..3e13baf92 100644 --- a/build.gradle +++ b/build.gradle @@ -108,6 +108,11 @@ spotbugsTest { ignoreFailures = true } +check { + dependsOn spotbugsMain + dependsOn spotbugsTest +} + jacoco { toolVersion = "0.8.5" } diff --git a/pa_bin/performance-analyzer-agent b/pa_bin/performance-analyzer-agent index 97ab2ea9e..ac2894bda 100755 --- a/pa_bin/performance-analyzer-agent +++ b/pa_bin/performance-analyzer-agent @@ -24,11 +24,11 @@ else fi if ! echo $* | grep -E '(^-d |-d$| -d |--daemonize$|--daemonize )' > /dev/null; then - export JAVA_OPTS=-Des.path.home=$ES_HOME\ -Dlog4j.configurationFile=$ES_HOME/performance-analyzer-rca/pa_config/log4j2.xml + export JAVA_OPTS=-Des.path.home=$ES_HOME\ -Dlog4j.configurationFile=$ES_HOME/performance-analyzer-rca/pa_config/log4j2.xml\ -DconfigFilePath=$ES_HOME/performance-analyzer-rca/pa_config/performance-analyzer.properties exec $ES_HOME/performance-analyzer-rca/bin/performance-analyzer-rca else echo 'Starting deamon' - export JAVA_OPTS=-Des.path.home=$ES_HOME\ -Dlog4j.configurationFile=$ES_HOME/performance-analyzer-rca/pa_config/log4j2.xml + export JAVA_OPTS=-Des.path.home=$ES_HOME\ -Dlog4j.configurationFile=$ES_HOME/performance-analyzer-rca/pa_config/log4j2.xml\ -DconfigFilePath=$ES_HOME/performance-analyzer-rca/pa_config/performance-analyzer.properties exec $ES_HOME/performance-analyzer-rca/bin/performance-analyzer-rca & pid=$! diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotResourceSummary.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotResourceSummary.java index 688ec6a25..3eed8fba6 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotResourceSummary.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotResourceSummary.java @@ -17,14 +17,12 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotResourceSummaryMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.JooqFieldValue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import javax.annotation.Nullable; @@ -52,7 +50,7 @@ public class HotResourceSummary extends GenericSummary { public static final String HOT_RESOURCE_SUMMARY_TABLE = HotResourceSummary.class.getSimpleName(); private static final Logger LOG = LogManager.getLogger(HotResourceSummary.class); - private final ResourceType resourceType; + private final Resource resource; private double threshold; private double value; private double avgValue; @@ -62,10 +60,10 @@ public class HotResourceSummary extends GenericSummary { private String metaData; private List topConsumerSummaryList; - public HotResourceSummary(ResourceType resourceType, double threshold, + public HotResourceSummary(Resource resource, double threshold, double value, int timePeriod) { super(); - this.resourceType = resourceType; + this.resource = resource; this.threshold = threshold; this.value = value; @@ -77,10 +75,10 @@ public HotResourceSummary(ResourceType resourceType, double threshold, this.topConsumerSummaryList = new ArrayList<>(); } - public HotResourceSummary(ResourceType resourceType, double threshold, + public HotResourceSummary(Resource resource, double threshold, double value, int timePeriod, String metaData) { super(); - this.resourceType = resourceType; + this.resource = resource; this.threshold = threshold; this.value = value; @@ -98,8 +96,8 @@ public void setValueDistribution(double minValue, double maxValue, double avgVal this.avgValue = avgValue; } - public ResourceType getResourceType() { - return this.resourceType; + public Resource getResource() { + return this.resource; } public double getValue() { @@ -130,7 +128,7 @@ public void appendNestedSummary(TopConsumerSummary summary) { public HotResourceSummaryMessage buildSummaryMessage() { final HotResourceSummaryMessage.Builder summaryMessageBuilder = HotResourceSummaryMessage .newBuilder(); - summaryMessageBuilder.setResourceType(this.resourceType); + summaryMessageBuilder.setResource(this.resource); summaryMessageBuilder.setThreshold(this.threshold); summaryMessageBuilder.setValue(this.value); summaryMessageBuilder.setAvgValue(this.avgValue); @@ -151,7 +149,7 @@ public void buildSummaryMessageAndAddToFlowUnit(FlowUnitMessage.Builder messageB } public static HotResourceSummary buildHotResourceSummaryFromMessage(HotResourceSummaryMessage message) { - HotResourceSummary newSummary = new HotResourceSummary(message.getResourceType(), message.getThreshold(), + HotResourceSummary newSummary = new HotResourceSummary(message.getResource(), message.getThreshold(), message.getValue(), message.getTimePeriod()); newSummary.setValueDistribution(message.getMinValue(), message.getMaxValue(), message.getAvgValue()); if (message.hasConsumers()) { @@ -166,14 +164,14 @@ public static HotResourceSummary buildHotResourceSummaryFromMessage(HotResourceS @Override public String toString() { return new StringBuilder() - .append(ResourceTypeUtil.getResourceTypeName(this.resourceType)) + .append(ResourceUtil.getResourceTypeName(resource)) + .append(" ") + .append(ResourceUtil.getResourceMetricName(resource)) .append(" ") .append(this.threshold) .append(" ") .append(this.value) .append(" ") - .append(ResourceTypeUtil.getResourceTypeUnit(this.resourceType)) - .append(" ") .append(this.topConsumerSummaryList) .append(" ") .append(this.metaData) @@ -189,12 +187,12 @@ public String getTableName() { public List> getSqlSchema() { List> schema = new ArrayList<>(); schema.add(ResourceSummaryField.RESOURCE_TYPE_FIELD.getField()); + schema.add(ResourceSummaryField.RESOURCE_METRIC_FIELD.getField()); schema.add(ResourceSummaryField.THRESHOLD_FIELD.getField()); schema.add(ResourceSummaryField.VALUE_FIELD.getField()); schema.add(ResourceSummaryField.AVG_VALUE_FIELD.getField()); schema.add(ResourceSummaryField.MIN_VALUE_FIELD.getField()); schema.add(ResourceSummaryField.MAX_VALUE_FIELD.getField()); - schema.add(ResourceSummaryField.UNIT_TYPE_FIELD.getField()); schema.add(ResourceSummaryField.TIME_PERIOD_FIELD.getField()); schema.add(ResourceSummaryField.METADATA_FIELD.getField()); return schema; @@ -203,13 +201,13 @@ public List> getSqlSchema() { @Override public List getSqlValue() { List value = new ArrayList<>(); - value.add(ResourceTypeUtil.getResourceTypeName(this.resourceType)); + value.add(resource.getResourceEnumValue()); + value.add(resource.getMetricEnumValue()); value.add(Double.valueOf(this.threshold)); value.add(Double.valueOf(this.value)); value.add(Double.valueOf(this.avgValue)); value.add(Double.valueOf(this.minValue)); value.add(Double.valueOf(this.maxValue)); - value.add(ResourceTypeUtil.getResourceTypeUnit(this.resourceType)); value.add(Integer.valueOf(this.timePeriod)); value.add(metaData); return value; @@ -223,14 +221,14 @@ public List getSqlValue() { public JsonElement toJson() { JsonObject summaryObj = new JsonObject(); summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.RESOURCE_TYPE_COL_NAME, - ResourceTypeUtil.getResourceTypeName(this.resourceType)); + ResourceUtil.getResourceTypeName(this.resource)); + summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.RESOURCE_METRIC_COL_NAME, + ResourceUtil.getResourceMetricName(this.resource)); summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.THRESHOLD_COL_NAME, this.threshold); summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.VALUE_COL_NAME, this.value); summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.AVG_VALUE_COL_NAME, this.avgValue); summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.MIN_VALUE_COL_NAME, this.minValue); summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.MAX_VALUE_COL_NAME, this.maxValue); - summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.UNIT_TYPE_COL_NAME, - ResourceTypeUtil.getResourceTypeUnit(this.resourceType)); summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.TIME_PERIOD_COL_NAME, this.timePeriod); summaryObj.addProperty(SQL_SCHEMA_CONSTANTS.META_DATA_COL_NAME, this.metaData); if (!getNestedSummaryList().isEmpty()) { @@ -268,12 +266,12 @@ public List getNestedSummaryTables() { public static class SQL_SCHEMA_CONSTANTS { public static final String RESOURCE_TYPE_COL_NAME = "resource_type"; + public static final String RESOURCE_METRIC_COL_NAME = "resource_metric"; public static final String THRESHOLD_COL_NAME = "threshold"; public static final String VALUE_COL_NAME = "value"; public static final String AVG_VALUE_COL_NAME = "avg"; public static final String MIN_VALUE_COL_NAME = "min"; public static final String MAX_VALUE_COL_NAME = "max"; - public static final String UNIT_TYPE_COL_NAME = "unit_type"; public static final String TIME_PERIOD_COL_NAME = "time_period_seconds"; public static final String META_DATA_COL_NAME = "meta_data"; } @@ -282,13 +280,13 @@ public static class SQL_SCHEMA_CONSTANTS { * Cluster summary SQL fields */ public enum ResourceSummaryField implements JooqFieldValue { - RESOURCE_TYPE_FIELD(SQL_SCHEMA_CONSTANTS.RESOURCE_TYPE_COL_NAME, String.class), + RESOURCE_TYPE_FIELD(SQL_SCHEMA_CONSTANTS.RESOURCE_TYPE_COL_NAME, Integer.class), + RESOURCE_METRIC_FIELD(SQL_SCHEMA_CONSTANTS.RESOURCE_METRIC_COL_NAME, Integer.class), THRESHOLD_FIELD(SQL_SCHEMA_CONSTANTS.THRESHOLD_COL_NAME, Double.class), VALUE_FIELD(SQL_SCHEMA_CONSTANTS.VALUE_COL_NAME, Double.class), AVG_VALUE_FIELD(SQL_SCHEMA_CONSTANTS.AVG_VALUE_COL_NAME, Double.class), MIN_VALUE_FIELD(SQL_SCHEMA_CONSTANTS.MIN_VALUE_COL_NAME, Double.class), MAX_VALUE_FIELD(SQL_SCHEMA_CONSTANTS.MAX_VALUE_COL_NAME, Double.class), - UNIT_TYPE_FIELD(SQL_SCHEMA_CONSTANTS.UNIT_TYPE_COL_NAME, String.class), TIME_PERIOD_FIELD(SQL_SCHEMA_CONSTANTS.TIME_PERIOD_COL_NAME, Integer.class), METADATA_FIELD(SQL_SCHEMA_CONSTANTS.META_DATA_COL_NAME, String.class); @@ -321,7 +319,8 @@ public String getName() { public static HotResourceSummary buildSummary(Record record) { HotResourceSummary summary = null; try { - String resourceTypeName = record.get(ResourceSummaryField.RESOURCE_TYPE_FIELD.getField(), String.class); + int resourceTypeEnumVal = record.get(ResourceSummaryField.RESOURCE_TYPE_FIELD.getField(), Integer.class); + int resourceMetricEnumVal = record.get(ResourceSummaryField.RESOURCE_METRIC_FIELD.getField(), Integer.class); Double threshold = record.get(ResourceSummaryField.THRESHOLD_FIELD.getField(), Double.class); Double value = record.get(ResourceSummaryField.VALUE_FIELD.getField(), Double.class); Double avgValue = record.get(ResourceSummaryField.AVG_VALUE_FIELD.getField(), Double.class); @@ -329,7 +328,8 @@ public static HotResourceSummary buildSummary(Record record) { Double maxValue = record.get(ResourceSummaryField.MAX_VALUE_FIELD.getField(), Double.class); Integer timePeriod = record.get(ResourceSummaryField.TIME_PERIOD_FIELD.getField(), Integer.class); String metaData = record.get(ResourceSummaryField.METADATA_FIELD.getField(), String.class); - summary = new HotResourceSummary(ResourceTypeUtil.buildResourceType(resourceTypeName), + summary = new HotResourceSummary( + ResourceUtil.buildResource(resourceTypeEnumVal, resourceMetricEnumVal), threshold, value, timePeriod, metaData); // those three fields are optional. check before setting to the obj if (avgValue != null diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceTypeUtil.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceTypeUtil.java deleted file mode 100644 index 4c2e56329..000000000 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceTypeUtil.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries; - -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HardwareEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PANetworking; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType.ResourceTypeOneofCase; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceTypeOptions; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ProtocolMessageEnum; -import javax.annotation.Nullable; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * A utility class to parse and build grpc ResourceType - */ -public class ResourceTypeUtil { - private static final Logger LOG = LogManager.getLogger(ResourceTypeUtil.class); - - static final String UNKNOWN_RESOURCE_TYPE_NAME = "unknown resource type"; - static final String UNKNOWN_RESOURCE_TYPE_UNIT = "unknown resource unit type"; - - /** - * Read the resourceType name from the ResourceType object - * @param resourceType grpc ResourceType object - * @return resource type name - */ - public static String getResourceTypeName(ResourceType resourceType) { - String resourceName = UNKNOWN_RESOURCE_TYPE_NAME; - ResourceTypeOptions resourceTypeOptions = ResourceTypeUtil.getResourceTypeOptions(resourceType); - if (resourceTypeOptions != null) { - resourceName = resourceTypeOptions.getResourceTypeName(); - } - return resourceName; - } - - /** - * Read the resourceType unit type from the ResourceType object - * @param resourceType grpc ResourceType object - * @return resource unit type - */ - public static String getResourceTypeUnit(ResourceType resourceType) { - String resourceName = UNKNOWN_RESOURCE_TYPE_UNIT; - ResourceTypeOptions resourceTypeOptions = ResourceTypeUtil.getResourceTypeOptions(resourceType); - if (resourceTypeOptions != null) { - resourceName = resourceTypeOptions.getResourceTypeUnit(); - } - return resourceName; - } - - private static ResourceTypeOptions getResourceTypeOptions(ResourceType resourceType) { - ProtocolMessageEnum resourceEnum; - if (resourceType == null) { - LOG.error("resourceType is null"); - return null; - } - if (resourceType.getResourceTypeOneofCase() == ResourceTypeOneofCase.JVM) { - resourceEnum = resourceType.getJVM(); - } - else if (resourceType.getResourceTypeOneofCase() == ResourceTypeOneofCase.HARDWARE_RESOURCE_TYPE) { - resourceEnum = resourceType.getHardwareResourceType(); - } - else { - LOG.error("unknown resource enum type"); - return null; - } - return resourceEnum.getValueDescriptor().getOptions() - .getExtension(PANetworking.resourceTypeOptions); - } - - @VisibleForTesting - static ResourceTypeOptions getResourceTypeOptions(ProtocolMessageEnum resourceEnum) { - return resourceEnum.getValueDescriptor().getOptions() - .getExtension(PANetworking.resourceTypeOptions); - } - - /** - * Map resourceTypeName to its enum object - * @param resourceTypeName The resourceTypeName field defined in protbuf. - * @return ResourceType enum object - */ - @Nullable - public static ResourceType buildResourceType(String resourceTypeName) { - ResourceType resourceType = null; - if (resourceTypeName == null) { - return resourceType; - } - ResourceType.Builder builder = null; - if (resourceTypeName.equals(ResourceTypeUtil.getResourceTypeOptions(JvmEnum.OLD_GEN).getResourceTypeName())) { - builder = ResourceType.newBuilder().setJVM(JvmEnum.OLD_GEN); - } - else if (resourceTypeName.equals(ResourceTypeUtil.getResourceTypeOptions(JvmEnum.YOUNG_GEN).getResourceTypeName())) { - builder = ResourceType.newBuilder().setJVM(JvmEnum.YOUNG_GEN); - } - else if (resourceTypeName.equals(ResourceTypeUtil.getResourceTypeOptions(HardwareEnum.CPU).getResourceTypeName())) { - builder = ResourceType.newBuilder().setHardwareResourceType(HardwareEnum.CPU); - } - if (builder != null) { - resourceType = builder.build(); - } - return resourceType; - } - -} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java new file mode 100644 index 000000000..259f835a6 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.AdditionalFields; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.PANetworking; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; + +/** + * A utility class to parse and build grpc Resource message + * Resource message consist of two parts : ResourceEnum and MetricEnum. + * Both Enum types are defined in protobuf (src/main/proto/inter_node_rpc_service.proto) + *

+ * ResourceEnum : different resource type on instance : CPU, IO, CACHE, etc. + * MetricEnum : metrics of each resource type, i.e. IO can have metrics + * such as TOTAL_THROUGHPUT and SYS_CALL_RATE + */ +public class ResourceUtil { + + // JVM resource + public static final Resource OLD_GEN_HEAP_USAGE = Resource.newBuilder() + .setResourceEnum(ResourceEnum.OLD_GEN) + .setMetricEnum(MetricEnum.HEAP_USAGE).build(); + public static final Resource YOUNG_GEN_PROMOTION_RATE = Resource.newBuilder() + .setResourceEnum(ResourceEnum.YOUNG_GEN) + .setMetricEnum(MetricEnum.PROMOTION_RATE).build(); + + // hardware resource + public static final Resource CPU_USAGE = Resource.newBuilder() + .setResourceEnum(ResourceEnum.CPU) + .setMetricEnum(MetricEnum.CPU_USAGE).build(); + public static final Resource IO_TOTAL_THROUGHPUT = Resource.newBuilder() + .setResourceEnum(ResourceEnum.IO) + .setMetricEnum(MetricEnum.TOTAL_THROUGHPUT).build(); + public static final Resource IO_TOTAL_SYS_CALLRATE = Resource.newBuilder() + .setResourceEnum(ResourceEnum.IO) + .setMetricEnum(MetricEnum.TOTAL_SYS_CALLRATE).build(); + + // thread pool + public static final Resource WRITE_QUEUE_REJECTION = Resource.newBuilder() + .setResourceEnum(ResourceEnum.WRITE_THREADPOOL) + .setMetricEnum(MetricEnum.QUEUE_REJECTION).build(); + public static final Resource SEARCH_QUEUE_REJECTION = Resource.newBuilder() + .setResourceEnum(ResourceEnum.SEARCH_THREADPOOL) + .setMetricEnum(MetricEnum.QUEUE_REJECTION).build(); + + /** + * Read the resourceType name from the ResourceType object + * @param resource grpc Resource object + * @return resource type name + */ + public static String getResourceTypeName(Resource resource) { + return resource.getResourceEnum().getValueDescriptor().getOptions() + .getExtension(PANetworking.additionalFields).getName(); + } + + /** + * Read the resourceType unit type from the ResourceType object + * @param resource grpc ResourceType object + * @return resource unit type + */ + public static String getResourceMetricName(Resource resource) { + AdditionalFields resourceMetricOptions = resource.getMetricEnum().getValueDescriptor().getOptions() + .getExtension(PANetworking.additionalFields); + return resourceMetricOptions.getName() + "(" + resourceMetricOptions.getDescription() + ")"; + } + + /** + * Build Resource object from enum value + * @param resourceEnumValue resource enum value + * @param metricEnumValue metric enum value + * @return ResourceType enum object + */ + public static Resource buildResource(int resourceEnumValue, int metricEnumValue) { + Resource.Builder builder = Resource.newBuilder(); + builder.setResourceEnumValue(resourceEnumValue); + builder.setMetricEnumValue(metricEnumValue); + return builder.build(); + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/GenericContext.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/GenericContext.java index 2efe145fa..246fd316d 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/GenericContext.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/core/GenericContext.java @@ -32,6 +32,10 @@ public boolean isUnhealthy() { return this.state == Resources.State.UNHEALTHY || this.state == Resources.State.CONTENDED; } + public boolean isHealthy() { + return this.state == Resources.State.HEALTHY; + } + public boolean isUnknown() { return this.state == Resources.State.UNKNOWN; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/HighHeapUsageClusterRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/HighHeapUsageClusterRca.java index 68d1456b3..f28ae6ed2 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/HighHeapUsageClusterRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/HighHeapUsageClusterRca.java @@ -16,7 +16,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; @@ -24,7 +24,6 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaVerticesMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; @@ -91,10 +90,10 @@ private List getUnhealthyNodeList() { if (flowUnit.getResourceContext().getState() == Resources.State.UNHEALTHY) { HotNodeSummary currentNodSummary = flowUnit.getSummary(); for (HotResourceSummary resourceSummary : currentNodSummary.getHotResourceSummaryList()) { - if (resourceSummary.getResourceType().getJVM() == JvmEnum.YOUNG_GEN) { + if (resourceSummary.getResource().getResourceEnum() == ResourceEnum.YOUNG_GEN) { youngGenSummaries.add(resourceSummary); } - else if (resourceSummary.getResourceType().getJVM() == JvmEnum.OLD_GEN) { + else if (resourceSummary.getResource().getResourceEnum() == ResourceEnum.OLD_GEN) { oldGenSummaries.add(resourceSummary); } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/HotNodeClusterRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/HotNodeClusterRca.java index 84b7a3a0a..35e745a6c 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/HotNodeClusterRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/HotNodeClusterRca.java @@ -15,7 +15,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.HotNodeClusterRcaConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; @@ -24,7 +24,6 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; @@ -52,7 +51,7 @@ public class HotNodeClusterRca extends Rca> private static final double NODE_COUNT_THRESHOLD = 0.8; private static final long TIMESTAMP_EXPIRATION_IN_MINS = 5; private final Rca> hotNodeRca; - private final Table nodeTable; + private final Table nodeTable; private final int rcaPeriod; private int counter; private List dataNodesDetails; @@ -84,9 +83,9 @@ private void addSummaryToNodeMap(List> hotNodeR } long timestamp = clock.millis(); for (HotResourceSummary resourceSummary : nodeSummary.getHotResourceSummaryList()) { - NodeResourceUsage oldUsage = nodeTable.get(nodeSummary.getNodeID(), resourceSummary.getResourceType()); + NodeResourceUsage oldUsage = nodeTable.get(nodeSummary.getNodeID(), resourceSummary.getResource()); if (oldUsage == null || oldUsage.timestamp < timestamp) { - nodeTable.put(nodeSummary.getNodeID(), resourceSummary.getResourceType(), + nodeTable.put(nodeSummary.getNodeID(), resourceSummary.getResource(), new NodeResourceUsage(timestamp, resourceSummary)); } } @@ -110,8 +109,8 @@ private ResourceFlowUnit checkUnbalancedNode() { long currTimestamp = clock.millis(); //For each resource type, scan over all the nodes in cluster and calculate its medium. - final List resourceTypeColumnKeys = ImmutableList.copyOf(nodeTable.columnKeySet()); - for (ResourceType resourceType : resourceTypeColumnKeys) { + final List resourceTypeColumnKeys = ImmutableList.copyOf(nodeTable.columnKeySet()); + for (Resource resourceType : resourceTypeColumnKeys) { List resourceUsages = new ArrayList<>(); for (NodeDetails nodeDetail : dataNodesDetails) { NodeResourceUsage currentUsage = nodeTable.get(nodeDetail.getId(), resourceType); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/BaseClusterRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/BaseClusterRca.java new file mode 100644 index 000000000..1e4f89d06 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/BaseClusterRca.java @@ -0,0 +1,237 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources.State; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor.NodeDetails; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * This is a generic cluster level RCA which subscripts upstream node level RCAs and generate a flowunit + * with cluster level summary that concludes the healthiness of the cluster in terms of those node level RCAs. + * This cluster RCA maintains a Table to keep track of flowunits sending from different nodes across + * the cluster. This table is a two dimensional table indexed by (NodeKey, Rca Name) and each cells stores + * that last numOfFlowUnitsInMap flowunits it receives. This RCA will + * mark the cluster as unhealthy if the flowunits from any data nodes are unhealthy. + *

+ * A few protected variables that can be overridden by derived class: + * numOfFlowUnitsInMap : number of consecutive flowunits stored in hashtable. Default is 1 + * collectFromMasterNode : whether this RCA collect flowunit from master nodes. + * expirationTimeWindow : time window to determine whether flowunit in hashmap becomes stale + * method that can be overriden : + * generateNodeSummary(NodeKey) : how do we want to parse the table and generate summary for one node. + */ +public class BaseClusterRca extends Rca> { + + private static final Logger LOG = LogManager.getLogger(BaseClusterRca.class); + private static final int DEFAULT_NUM_OF_FLOWUNITS = 1; + private static final long TIMESTAMP_EXPIRATION_IN_MILLIS = TimeUnit.MINUTES.toMillis(10); + private final List>> nodeRcas; + // two dimensional table indexed by (NodeKey, Rca Name) => last numOfFlowUnitsInMap flowunits + protected final Table>> nodeTable; + private final int rcaPeriod; + private int counter; + protected Clock clock; + protected int numOfFlowUnitsInMap; + protected boolean collectFromMasterNode; + protected long expirationTimeWindow; + + + @SafeVarargs + public >> BaseClusterRca(final int rcaPeriod, + final R... nodeRca) { + super(5); + this.rcaPeriod = rcaPeriod; + this.counter = 0; + this.clock = Clock.systemUTC(); + this.numOfFlowUnitsInMap = DEFAULT_NUM_OF_FLOWUNITS; + this.nodeTable = HashBasedTable.create(); + this.collectFromMasterNode = false; + this.expirationTimeWindow = TIMESTAMP_EXPIRATION_IN_MILLIS; + this.nodeRcas = Arrays.asList(nodeRca); + } + + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } + + @VisibleForTesting + public void setCollectFromMasterNode(boolean collectFromMasterNode) { + this.collectFromMasterNode = collectFromMasterNode; + } + + //add upstream flowunits collected from different nodes into Table + private void addUpstreamFlowUnits(Rca> nodeRca) { + List> flowUnits = nodeRca.getFlowUnits(); + for (ResourceFlowUnit flowUnit : flowUnits) { + if (flowUnit.isEmpty()) { + continue; + } + HotNodeSummary nodeSummary = flowUnit.getSummary(); + NodeKey nodeKey = new NodeKey(nodeSummary.getNodeID(), nodeSummary.getHostAddress()); + + if (nodeTable.get(nodeKey, nodeRca.name()) == null) { + nodeTable.put(nodeKey, nodeRca.name(), new LinkedList<>()); + } + LinkedList> linkedList = nodeTable.get(nodeKey, nodeRca.name()); + linkedList.addLast(flowUnit); + if (linkedList.size() > numOfFlowUnitsInMap) { + linkedList.pollFirst(); + } + } + } + + private List getClusterNodesDetails() { + if (collectFromMasterNode) { + return ClusterDetailsEventProcessor.getNodesDetails(); + } + else { + return ClusterDetailsEventProcessor.getDataNodesDetails(); + } + } + + // TODO : we might need to change this function later to use EventListener + // to update the nodeMap whenever the ClusterDetailsEventProcessor is updated + // so we don't have to keep polling the NodeDetails in every time window. + private void removeInactiveNodeFromNodeMap() { + Set nodeIdSet = new HashSet<>(); + List inactiveNodes = new ArrayList<>(); + for (NodeDetails nodeDetail : getClusterNodesDetails()) { + nodeIdSet.add(nodeDetail.getId()); + } + for (NodeKey nodeKey : nodeTable.rowKeySet()) { + if (!nodeIdSet.contains(nodeKey.getNodeId())) { + inactiveNodes.add(nodeKey); + LOG.info("RCA: remove node {} from node map", nodeKey); + } + } + inactiveNodes.forEach(nodeKey -> nodeTable.row(nodeKey).clear()); + } + + /** + * generate flowunit for downstream based on the flowunits this RCA collects in hashmap + * flowunits with timestamp beyond expirationTimeWindow time frame are considered + * as stale and ignored by this RCA. + * @return flowunit for downstream vertices + */ + private ResourceFlowUnit generateFlowUnit() { + List unhealthyNodeSummaries = new ArrayList<>(); + long timestamp = clock.millis(); + List clusterNodesDetails = getClusterNodesDetails(); + // iterate through this table + for (NodeDetails nodeDetails : clusterNodesDetails) { + NodeKey nodeKey = new NodeKey(nodeDetails.getId(), nodeDetails.getHostAddress()); + // skip if the node is not found in table + if (!nodeTable.containsRow(nodeKey)) { + continue; + } + HotNodeSummary newNodeSummary = generateNodeSummary(nodeKey); + if (newNodeSummary != null) { + unhealthyNodeSummaries.add(newNodeSummary); + } + } + if (!unhealthyNodeSummaries.isEmpty()) { + HotClusterSummary clusterSummary = new HotClusterSummary(clusterNodesDetails.size(), unhealthyNodeSummaries.size()); + for (HotNodeSummary nodeSummary : unhealthyNodeSummaries) { + clusterSummary.appendNestedSummary(nodeSummary); + } + return new ResourceFlowUnit<>(timestamp, new ResourceContext(Resources.State.UNHEALTHY), clusterSummary, true); + } + else { + return new ResourceFlowUnit<>(timestamp, new ResourceContext(State.HEALTHY), null); + } + } + + /** + * generate summary for node (nodeKey). read the flowunits of all upstream RCAs from + * this node and generate its node level summary as ouput. + * The default implementation in this method is to pick the most recent flowunits from the table + * and check the healthiness of flowunits from all up stream RCAs and whenever any flowunit is + * unhealthy, we mark the node as unhealthy and append the summary from this flowunit to the nested + * summary list of this node summary and use this summary as the final output of this method. + * @param nodeKey NodeKey of the node that we want to generate node summary for + * @return node summary for this node + */ + protected HotNodeSummary generateNodeSummary(NodeKey nodeKey) { + HotNodeSummary nodeSummary = null; + long timestamp = clock.millis(); + // for each RCA type this cluster RCA subscribes, read its most recent flowunit and if it is + // unhealthy, append this flowunit to output node summary + for (Rca> nodeRca : nodeRcas) { + // skip if we haven't receive any flowunit from this RCA yet. + if (nodeTable.get(nodeKey, nodeRca.name()) == null) { + continue; + } + ResourceFlowUnit flowUnit = nodeTable.get(nodeKey, nodeRca.name()).getLast(); + // skip this flowunit if : + // 1. the timestamp of this flowunit expires + // 2. flowunit is healthy + // 3. flowunit does not have summary attached to it + if (timestamp - flowUnit.getTimeStamp() > TIMESTAMP_EXPIRATION_IN_MILLIS + || flowUnit.getResourceContext().isHealthy() + || flowUnit.getSummary() == null) { + continue; + } + if (nodeSummary == null) { + nodeSummary = new HotNodeSummary(nodeKey.getNodeId(), nodeKey.getHostAddress()); + } + // append all resource summaries into this + flowUnit.getSummary().getHotResourceSummaryList().forEach(nodeSummary::appendNestedSummary); + } + return nodeSummary; + } + + @Override + public ResourceFlowUnit operate() { + counter += 1; + nodeRcas.forEach(this::addUpstreamFlowUnits); + + if (counter >= rcaPeriod) { + counter = 0; + removeInactiveNodeFromNodeMap(); + return generateFlowUnit(); + } else { + return new ResourceFlowUnit<>(System.currentTimeMillis()); + } + } + + @Override + public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { + throw new IllegalArgumentException(name() + "'s generateFlowUnitListFromWire() should not " + + "be required."); + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/NodeKey.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/NodeKey.java new file mode 100644 index 000000000..bb37997a4 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/NodeKey.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster; + +import org.apache.commons.lang3.builder.HashCodeBuilder; + +public class NodeKey { + private final String nodeId; + private final String hostAddress; + + public NodeKey(String nodeId, String hostAddress) { + this.nodeId = nodeId; + this.hostAddress = hostAddress; + } + + public String getNodeId() { + return nodeId; + } + + public String getHostAddress() { + return hostAddress; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof NodeKey) { + NodeKey key = (NodeKey)obj; + return nodeId.equals(key.getNodeId()) && hostAddress.equals(key.getHostAddress()); + } + return false; + } + + // the reason why we compare both node id and hostAddress here is because in + // newer ES version(6.8 and above), see https://github.com/elastic/elasticsearch/pull/19140. + // if es restart, both node id and ip address will remain the same so we can continue add + // flowunit into the same row in table before es restart. + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(nodeId) + .append(hostAddress) + .toHashCode(); + } + + @Override + public String toString() { + return nodeId + " " + hostAddress; + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/QueueRejectionClusterRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/QueueRejectionClusterRca.java new file mode 100644 index 000000000..b6d3fd505 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/cluster/QueueRejectionClusterRca.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; + +public class QueueRejectionClusterRca extends BaseClusterRca { + public static final String RCA_TABLE_NAME = QueueRejectionClusterRca.class.getSimpleName(); + + public >> QueueRejectionClusterRca(final int rcaPeriod, final R hotNodeRca) { + super(rcaPeriod, hotNodeRca); + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hot_node/GenericResourceRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hot_node/GenericResourceRca.java index e9db13288..ba238f7ea 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hot_node/GenericResourceRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hot_node/GenericResourceRca.java @@ -15,7 +15,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.hot_node; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; @@ -50,7 +50,7 @@ public class GenericResourceRca extends Rca private final Metric resourceUsageGroupByConsumer; private final int rcaPeriod; private int counter; - private final ResourceType resourceType; + private final Resource resource; private volatile double threshold; private volatile double lowerBoundThreshold; private volatile int topK; @@ -58,14 +58,14 @@ public class GenericResourceRca extends Rca /** * @param rcaPeriod num of rca periods for each evaluation interval - * @param resourceType resource type enum + * @param resource resource type enum * @param threshold threshold to identify contented resource * @param resourceUsageGroupByConsumer aggregate metric that groups resource * metrics by some columns * @param Metric base class */ public GenericResourceRca(final int rcaPeriod, - final ResourceType resourceType, final double threshold, + final Resource resource, final double threshold, final M resourceUsageGroupByConsumer) { super(5); this.resourceUsageGroupByConsumer = resourceUsageGroupByConsumer; @@ -75,7 +75,7 @@ public GenericResourceRca(final int rcaPeriod, this.clock = Clock.systemUTC(); this.threshold = threshold; this.lowerBoundThreshold = DEFAULT_LOWER_BOUND_THRESHOLD; - this.resourceType = resourceType; + this.resource = resource; this.topK = DEFAULT_TOP_K; } @@ -158,7 +158,7 @@ public ResourceFlowUnit operate() { //check to see if the value is above lower bound thres if (!Double.isNaN(avgCpuUsage) && avgCpuUsage >= lowerBoundThreshold) { - summary = new HotResourceSummary(this.resourceType, threshold, + summary = new HotResourceSummary(this.resource, threshold, avgCpuUsage, SLIDING_WINDOW_IN_MIN * 60); addTopConsumerSummary(summary); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hot_node/HighCpuRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hot_node/HighCpuRca.java index dd71b7db4..357eb31da 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hot_node/HighCpuRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hot_node/HighCpuRca.java @@ -15,9 +15,8 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.hot_node; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HardwareEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; /** * This RCA can be used to calculate total cpu usage(combines all physical cores) and @@ -27,8 +26,7 @@ public class HighCpuRca extends GenericResourceRca { private static final double CPU_USAGE_THRESHOLD = 0.7; public HighCpuRca(final int rcaPeriod, final M cpuUsageGroupByOperation) { - super(rcaPeriod, ResourceType.newBuilder().setHardwareResourceType(HardwareEnum.CPU).build(), - 0, cpuUsageGroupByOperation); + super(rcaPeriod, ResourceUtil.CPU_USAGE, 0, cpuUsageGroupByOperation); int cores = Runtime.getRuntime().availableProcessors(); this.setThreshold(CPU_USAGE_THRESHOLD * (double)cores); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/HighHeapUsageOldGenRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/HighHeapUsageOldGenRca.java index 5d73a85af..794e5d304 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/HighHeapUsageOldGenRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/HighHeapUsageOldGenRca.java @@ -18,10 +18,9 @@ import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCType.OLD_GEN; import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCType.TOT_FULL_GC; import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.HeapDimension.MEM_TYPE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.OLD_GEN_HEAP_USAGE; import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.HighHeapUsageOldGenRcaConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; @@ -77,7 +76,6 @@ public class HighHeapUsageOldGenRca extends Rca nodeStatAggregators; - private final ResourceType resourceType; // the amount of RCA period this RCA needs to run before sending out a flowunit private final int rcaPeriod; // The lower bound threshold in percentage to decide whether to send out summary. @@ -108,7 +106,6 @@ public HighHeapUsageOldGenRca(final int rcaPeriod, final doub this.lowerBoundThreshold = (lowerBoundThreshold >= 0 && lowerBoundThreshold <= 1.0) ? lowerBoundThreshold : 1.0; this.counter = 0; - this.resourceType = ResourceType.newBuilder().setJVM(JvmEnum.OLD_GEN).build(); gcEventSlidingWindow = new SlidingWindow<>(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES); minOldGenSlidingWindow = new MinOldGenSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES); @@ -214,7 +211,7 @@ public ResourceFlowUnit operate() { if (gcEventSlidingWindow.readSum() >= OLD_GEN_GC_THRESHOLD && !Double.isNaN(currentMinOldGenUsage) && currentMinOldGenUsage / maxOldGenHeapSize > OLD_GEN_USED_THRESHOLD_IN_PERCENTAGE * this.lowerBoundThreshold) { - summary = new HotResourceSummary(this.resourceType, + summary = new HotResourceSummary(OLD_GEN_HEAP_USAGE, OLD_GEN_USED_THRESHOLD_IN_PERCENTAGE, currentMinOldGenUsage / maxOldGenHeapSize, SLIDING_WINDOW_SIZE_IN_MINS * 60); addTopConsumers(summary); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/HighHeapUsageYoungGenRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/HighHeapUsageYoungGenRca.java index eb37e606b..1898f9deb 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/HighHeapUsageYoungGenRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/HighHeapUsageYoungGenRca.java @@ -18,12 +18,10 @@ import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCType.OLD_GEN; import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCType.TOT_YOUNG_GC; import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.HeapDimension.MEM_TYPE; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.YOUNG_GEN_PROMOTION_RATE; import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.HighHeapUsageOldGenRcaConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.HighHeapUsageYoungGenRcaConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; @@ -39,7 +37,6 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaVerticesMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; import java.time.Clock; -import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,7 +56,6 @@ public class HighHeapUsageYoungGenRca extends Rca HighHeapUsageYoungGenRca(final int rcaPeriod, final do this.lowerBoundThreshold = (lowerBoundThreshold >= 0 && lowerBoundThreshold <= 1.0) ? lowerBoundThreshold : 1.0; this.counter = 0; - this.resourceType = ResourceType.newBuilder().setJVM(JvmEnum.YOUNG_GEN).build(); this.gcTimeDeque = new SlidingWindow<>(PROMOTION_RATE_SLIDING_WINDOW_IN_MINS, TimeUnit.MINUTES); this.promotionRateThreshold = HighHeapUsageYoungGenRcaConfig.DEFAULT_PROMOTION_RATE_THRESHOLD_IN_MB_PER_SEC; this.youngGenGcTimeThreshold = HighHeapUsageYoungGenRcaConfig.DEFAULT_YOUNG_GEN_GC_TIME_THRESHOLD_IN_MS_PER_SEC; @@ -182,7 +177,7 @@ public ResourceFlowUnit operate() { //check to see if the value is above lower bound thres if (!Double.isNaN(avgPromotionRate) && avgPromotionRate > promotionRateThreshold * this.lowerBoundThreshold) { - summary = new HotResourceSummary(this.resourceType, + summary = new HotResourceSummary(YOUNG_GEN_PROMOTION_RATE, promotionRateThreshold, avgPromotionRate, PROMOTION_RATE_SLIDING_WINDOW_IN_MINS * 60); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/NodeStatAggregator.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/NodeStatAggregator.java index fdff4f106..6c0ac6172 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/NodeStatAggregator.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotheap/NodeStatAggregator.java @@ -98,12 +98,12 @@ public void collect(final long timestamp) { // in either case, we need to write a function to clean up this hashtable on reader periodically // to remove node stats of inactive shards private void purgeHashTable(final long timestamp) { - Iterator iterator = this.shardKeyMap.keySet().iterator(); + Iterator iterator = this.shardKeyMap.values().iterator(); while (iterator.hasNext()) { - IndexShardKey key = iterator.next(); - long timestampDiff = timestamp - this.shardKeyMap.get(key).getTimestamp(); + NodeStatValue value = iterator.next(); + long timestampDiff = timestamp - value.getTimestamp(); if (TimeUnit.MILLISECONDS.toMinutes(timestampDiff) > PURGE_HASH_TABLE_INTERVAL_IN_MINS) { - this.sum -= this.shardKeyMap.get(key).getValue(); + this.sum -= value.getValue(); iterator.remove(); } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotshard/HotShardClusterRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotshard/HotShardClusterRca.java index 73c1ebf20..ddc29d8d6 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotshard/HotShardClusterRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotshard/HotShardClusterRca.java @@ -15,8 +15,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.hotshard; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HardwareEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.HotShardClusterRcaConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; @@ -26,6 +25,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotShardSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; @@ -134,11 +134,11 @@ private double getThresholdValue(Map perIndexShardInfo, do * @param resourceInfoTable Guava Table with 'Index_Name', 'NodeShardKey' and 'UsageValue' * @param thresholdInPercentage Threshold for the resource in percentage * @param hotResourceSummaryList Summary List for hot shards - * @param resourceType Resource Type + * @param resource Resource message object defined in protobuf * */ private void findHotShardAndCreateSummary(Table resourceInfoTable, double thresholdInPercentage, - List hotResourceSummaryList, ResourceType resourceType) { + List hotResourceSummaryList, Resource resource) { for (String indexName : resourceInfoTable.rowKeySet()) { Map perIndexShardInfo = resourceInfoTable.row(indexName); double thresholdValue = getThresholdValue(perIndexShardInfo, thresholdInPercentage); @@ -149,7 +149,7 @@ private void findHotShardAndCreateSummary(Table re { shardInfo.getKey().getNodeId(), indexName, shardInfo.getKey().getShardId() }); // Add to hotResourceSummaryList - hotResourceSummaryList.add(new HotResourceSummary(resourceType, thresholdValue, + hotResourceSummaryList.add(new HotResourceSummary(resource, thresholdValue, shardInfo.getValue(), SLIDING_WINDOW_IN_SECONDS, shardIdentifier)); } } @@ -189,15 +189,15 @@ public ResourceFlowUnit operate() { // We evaluate hot shards individually on all the 3 dimensions findHotShardAndCreateSummary( cpuUtilizationInfoTable, cpuUtilizationClusterThreshold, hotShardSummaryList, - ResourceType.newBuilder().setHardwareResourceTypeValue(HardwareEnum.CPU_VALUE).build()); + ResourceUtil.CPU_USAGE); findHotShardAndCreateSummary( IOThroughputInfoTable, ioTotThroughputClusterThreshold, hotShardSummaryList, - ResourceType.newBuilder().setHardwareResourceTypeValue(HardwareEnum.IO_TOTAL_THROUGHPUT_VALUE).build()); + ResourceUtil.IO_TOTAL_THROUGHPUT); findHotShardAndCreateSummary( IOSysCallRateInfoTable, ioTotSysCallRateClusterThreshold, hotShardSummaryList, - ResourceType.newBuilder().setHardwareResourceTypeValue(HardwareEnum.IO_TOTAL_SYS_CALLRATE_VALUE).build()); + ResourceUtil.IO_TOTAL_SYS_CALLRATE); if (hotShardSummaryList.isEmpty()) { context = new ResourceContext(Resources.State.HEALTHY); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotshard/HotShardRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotshard/HotShardRca.java index be34c8b0d..7caeed265 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotshard/HotShardRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/hotshard/HotShardRca.java @@ -21,8 +21,9 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HardwareEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.configs.HotShardRcaConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; @@ -35,7 +36,6 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotShardSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; @@ -78,7 +78,6 @@ public class HotShardRca extends Rca> { private final Metric cpuUtilization; private final Metric ioTotThroughput; private final Metric ioTotSyscallRate; - private final ResourceType resourceType; private final int rcaPeriod; private int counter; protected Clock clock; @@ -96,7 +95,6 @@ public HotShardRca(final long evaluationIntervalSeconds, fina this.ioTotSyscallRate = ioTotSyscallRate; this.rcaPeriod = rcaPeriod; this.counter = 0; - this.resourceType = ResourceType.newBuilder().setHardwareResourceTypeValue(HardwareEnum.CPU_VALUE).build(); this.clock = Clock.systemUTC(); this.cpuUtilizationMap = new HashMap<>(); this.ioTotThroughputMap = new HashMap<>(); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/threadpool/QueueRejectionRca.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/threadpool/QueueRejectionRca.java index 3530fcafe..aaec82519 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/threadpool/QueueRejectionRca.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/store/rca/threadpool/QueueRejectionRca.java @@ -18,8 +18,9 @@ import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ThreadPoolEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.MetricEnum; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric; @@ -31,6 +32,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor; import com.google.common.annotations.VisibleForTesting; @@ -63,9 +65,9 @@ public QueueRejectionRca(final int rcaPeriod, M threadPool_Re counter = 0; clock = Clock.systemUTC(); queueRejectionCollectors = Collections.unmodifiableList(Arrays.asList( - new QueueRejectionCollector(ThreadPoolEnum.WRITE_QUEUE, ThreadPoolType.WRITE, + new QueueRejectionCollector(ResourceUtil.WRITE_QUEUE_REJECTION, ThreadPoolType.WRITE, threadPool_RejectedReqs, REJECTION_TIME_PERIOD_IN_MILLISECOND), - new QueueRejectionCollector(ThreadPoolEnum.SEARCH_QUEUE, ThreadPoolType.SEARCH, + new QueueRejectionCollector(ResourceUtil.SEARCH_QUEUE_REJECTION, ThreadPoolType.SEARCH, threadPool_RejectedReqs, REJECTION_TIME_PERIOD_IN_MILLISECOND) )); } @@ -129,16 +131,16 @@ public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { * A collector class to collect rejection from each queue type */ private static class QueueRejectionCollector { - private final ResourceType threadPool; + private final Resource threadPool; private final ThreadPoolType threadPoolMetric; private final Metric threadPool_RejectedReqs; private boolean hasRejection; private long rejectionTimestamp; private long rejectionTimePeriodThreshold; - public QueueRejectionCollector(final ThreadPoolEnum threadPool, final ThreadPoolType threadPoolMetric, + public QueueRejectionCollector(final Resource threadPool, final ThreadPoolType threadPoolMetric, final Metric threadPool_RejectedReqs, final long threshold) { - this.threadPool = ResourceType.newBuilder().setThreadPool(threadPool).build(); + this.threadPool = threadPool; this.threadPoolMetric = threadPoolMetric; this.threadPool_RejectedReqs = threadPool_RejectedReqs; this.hasRejection = false; diff --git a/src/main/proto/inter_node_rpc_service.proto b/src/main/proto/inter_node_rpc_service.proto index 2678ba2a7..b0865e56a 100644 --- a/src/main/proto/inter_node_rpc_service.proto +++ b/src/main/proto/inter_node_rpc_service.proto @@ -48,40 +48,50 @@ message ResourceContextMessage { /* Enum for different resource type */ -message ResourceTypeOptions { - string resourceTypeName = 1; - string resourceTypeUnit = 2; +message AdditionalFields { + string name = 1; + string description = 2; } extend google.protobuf.EnumValueOptions { - ResourceTypeOptions resourceTypeOptions = 50000; + AdditionalFields additional_fields = 50000; } -enum JvmEnum { - OLD_GEN = 0 [(resourceTypeOptions).resourceTypeName = "old gen", (resourceTypeOptions).resourceTypeUnit = "heap usage in percentage"]; - YOUNG_GEN = 1 [(resourceTypeOptions).resourceTypeName = "young gen", (resourceTypeOptions).resourceTypeUnit = "promotion rate in mb/s"]; -} +enum ResourceEnum { + // JVM + OLD_GEN = 0 [(additional_fields).name = "old gen"]; + YOUNG_GEN = 1 [(additional_fields).name = "young gen"]; + + // hardware + CPU = 2 [(additional_fields).name = "cpu usage"]; + IO = 3 [(additional_fields).name = "IO"]; -enum HardwareEnum { - CPU = 0 [(resourceTypeOptions).resourceTypeName = "cpu usage", (resourceTypeOptions).resourceTypeUnit = "cpu usage in percentage"]; - IO_TOTAL_THROUGHPUT = 1 [(resourceTypeOptions).resourceTypeName = "IO total throughput", (resourceTypeOptions).resourceTypeUnit = "number of bytes read/written per second"]; - IO_TOTAL_SYS_CALLRATE = 2 [(resourceTypeOptions).resourceTypeName = "IO total sys callrate", (resourceTypeOptions).resourceTypeUnit = "read and write system calls per second"]; + // threadpool + WRITE_THREADPOOL = 4 [(additional_fields).name = "write threadpool"]; + SEARCH_THREADPOOL = 5 [(additional_fields).name = "search threadpool"]; } -enum ThreadPoolEnum { - WRITE_QUEUE = 0 [(resourceTypeOptions).resourceTypeName = "write queue", (resourceTypeOptions).resourceTypeUnit = "rejection period in second"]; - SEARCH_QUEUE = 1 [(resourceTypeOptions).resourceTypeName = "search queue", (resourceTypeOptions).resourceTypeUnit = "rejection period in second"]; +enum MetricEnum { + // JVM + HEAP_USAGE = 0 [(additional_fields).name = "heap usage", (additional_fields).description = "memory usage in percentage"]; + PROMOTION_RATE = 1 [(additional_fields).name = "promotion rate", (additional_fields).description = "mb/s"]; + MINOR_GC = 2 [(additional_fields).name = "minor gc", (additional_fields).description = "time in percentage"]; + + // hardware + CPU_USAGE = 3 [(additional_fields).name = "cpu usage", (additional_fields).description = "num of cores"]; + TOTAL_THROUGHPUT = 4 [(additional_fields).name = "total throughput", (additional_fields).description = "number of bytes read/written per second"]; + TOTAL_SYS_CALLRATE = 5 [(additional_fields).name = "total sys callrate", (additional_fields).description = "read and write system calls per second"]; + + // threadpool + QUEUE_REJECTION = 6 [(additional_fields).name = "queue rejection", (additional_fields).description = "rejection period in second"]; } /* message for resource type Enum */ -message ResourceType { - oneof resource_type_oneof { - JvmEnum JVM = 1; - HardwareEnum hardware_resource_type = 2; - ThreadPoolEnum thread_pool = 3; - } +message Resource { + ResourceEnum resource_enum = 1; + MetricEnum metric_enum = 2; } /* @@ -104,7 +114,7 @@ message TopConsumerSummaryMessage { double value = 2; } message HotResourceSummaryMessage { - ResourceType resourceType = 1; + Resource resource = 1; TopConsumerSummaryList consumers = 2; double threshold = 3; double value = 4; diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/RcaTestHelper.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/RcaTestHelper.java index 50afefef5..5b74cb1aa 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/RcaTestHelper.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/RcaTestHelper.java @@ -15,7 +15,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; @@ -23,24 +23,51 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotShardSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; - +import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.List; public class RcaTestHelper extends Rca> { + private Clock clock; + private String rcaName; + public RcaTestHelper() { super(5); + this.clock = Clock.systemUTC(); + this.rcaName = name(); + } + + public RcaTestHelper(String rcaName) { + this(); + this.rcaName = rcaName; } public void mockFlowUnit(ResourceFlowUnit flowUnit) { this.flowUnits = Collections.singletonList(flowUnit); } + public void mockFlowUnit(ResourceFlowUnit flowUnit1, ResourceFlowUnit flowUnit2) { + this.flowUnits = Arrays.asList(flowUnit1, flowUnit2); + } + + public void mockFlowUnit() { + this.flowUnits = Collections.singletonList((ResourceFlowUnit)ResourceFlowUnit.generic()); + } + public void mockFlowUnits(List> flowUnitList) { this.flowUnits = flowUnitList; } + public void setClock(Clock clock) { + this.clock = clock; + } + + @Override + public String name() { + return rcaName; + } + @Override public ResourceFlowUnit operate() { return null; @@ -50,7 +77,7 @@ public ResourceFlowUnit operate() { public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { } - public static ResourceFlowUnit generateFlowUnit(ResourceType type, String nodeID, Resources.State healthy) { + public static ResourceFlowUnit generateFlowUnit(Resource type, String nodeID, Resources.State healthy) { HotResourceSummary resourceSummary = new HotResourceSummary(type, 10, 5, 60); HotNodeSummary nodeSummary = new HotNodeSummary(nodeID, "127.0.0.0"); @@ -58,6 +85,24 @@ public static ResourceFlowUnit generateFlowUnit(ResourceType typ return new ResourceFlowUnit<>(System.currentTimeMillis(), new ResourceContext(healthy), nodeSummary); } + public static ResourceFlowUnit generateFlowUnit(Resource type, String nodeID, + String hostAddress, Resources.State healthy) { + HotResourceSummary resourceSummary = new HotResourceSummary(type, + 10, 5, 60); + HotNodeSummary nodeSummary = new HotNodeSummary(nodeID, hostAddress); + nodeSummary.appendNestedSummary(resourceSummary); + return new ResourceFlowUnit<>(System.currentTimeMillis(), new ResourceContext(healthy), nodeSummary); + } + + public static ResourceFlowUnit generateFlowUnit(Resource type, String nodeID, + String hostAddress, Resources.State healthy, long timestamp) { + HotResourceSummary resourceSummary = new HotResourceSummary(type, + 10, 5, 60); + HotNodeSummary nodeSummary = new HotNodeSummary(nodeID, hostAddress); + nodeSummary.appendNestedSummary(resourceSummary); + return new ResourceFlowUnit<>(timestamp, new ResourceContext(healthy), nodeSummary); + } + public static ResourceFlowUnit generateFlowUnitForHotShard(String indexName, String shardId, String nodeID, double cpu_utilization, double io_throughput, double io_sys_callrate, Resources.State health) { HotShardSummary hotShardSummary = new HotShardSummary(indexName, shardId, nodeID, 60); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotClusterSummaryTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotClusterSummaryTest.java index 197e3e72c..a568c3a18 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotClusterSummaryTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotClusterSummaryTest.java @@ -16,7 +16,6 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary.SQL_SCHEMA_CONSTANTS; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.google.gson.JsonElement; diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummaryTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummaryTest.java index 37975d136..c1be8ef10 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummaryTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotNodeSummaryTest.java @@ -17,13 +17,10 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotNodeSummaryMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary.SQL_SCHEMA_CONSTANTS; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import java.util.Arrays; import java.util.List; import org.jooq.Field; import org.jooq.Record; @@ -35,7 +32,6 @@ public class HotNodeSummaryTest { private static final String NODE_ID = "ABC123"; private static final String HOST_ADDRESS = "127.0.0.0"; - private static final ResourceType RESOURCE_TYPE = ResourceType.newBuilder().setJVM(JvmEnum.YOUNG_GEN).build(); private static final double THRESHOLD = 3.14; private static final double VALUE = 2.71; private static HotNodeSummary uut; @@ -43,7 +39,7 @@ public class HotNodeSummaryTest { @BeforeClass public static void setup() { uut = new HotNodeSummary(NODE_ID, HOST_ADDRESS); - uut.appendNestedSummary(new HotResourceSummary(RESOURCE_TYPE, THRESHOLD, VALUE, 0)); + uut.appendNestedSummary(new HotResourceSummary(ResourceUtil.YOUNG_GEN_PROMOTION_RATE, THRESHOLD, VALUE, 0)); } @Test diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotResourceSummaryTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotResourceSummaryTest.java index c28e58d53..91e03cbed 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotResourceSummaryTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotResourceSummaryTest.java @@ -17,8 +17,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotResourceSummaryMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -31,7 +30,7 @@ import org.mockito.Mockito; public class HotResourceSummaryTest { - private final ResourceType RESOURCE_TYPE = ResourceType.newBuilder().setJVM(JvmEnum.YOUNG_GEN).build(); + private final Resource RESOURCE_TYPE = ResourceUtil.YOUNG_GEN_PROMOTION_RATE; private final double THRESHOLD = 2.718; private final double VALUE = 3.14159; private final double AVG_VALUE = 1.414; @@ -55,7 +54,7 @@ public void setup() { public void testBuildSummaryMessage() { Assert.assertEquals(1, uut.getNestedSummaryList().size()); HotResourceSummaryMessage msg = uut.buildSummaryMessage(); - Assert.assertEquals(RESOURCE_TYPE, msg.getResourceType()); + Assert.assertEquals(RESOURCE_TYPE, msg.getResource()); Assert.assertEquals(THRESHOLD, msg.getThreshold(), 0); Assert.assertEquals(VALUE, msg.getValue(), 0); Assert.assertEquals(AVG_VALUE, msg.getAvgValue(), 0); @@ -79,7 +78,7 @@ public void testBuildSummaryMessageAndAddToFlowUnit() { @Test public void testBuildHotResourceSummaryFromMessage() { HotResourceSummaryMessage msg = uut.buildSummaryMessage(); - Assert.assertEquals(RESOURCE_TYPE, msg.getResourceType()); + Assert.assertEquals(RESOURCE_TYPE, msg.getResource()); Assert.assertEquals(THRESHOLD, msg.getThreshold(), 0); Assert.assertEquals(VALUE, msg.getValue(), 0); Assert.assertEquals(AVG_VALUE, msg.getAvgValue(), 0); @@ -94,9 +93,9 @@ public void testBuildHotResourceSummaryFromMessage() { @Test public void testToString() { - String expected = ResourceTypeUtil.getResourceTypeName(RESOURCE_TYPE) + " " + THRESHOLD + " " + VALUE - + " " + ResourceTypeUtil.getResourceTypeUnit(RESOURCE_TYPE) + " " + uut.getNestedSummaryList() - + " " + META_DATA; + String expected = ResourceUtil.getResourceTypeName(RESOURCE_TYPE) + + " " + ResourceUtil.getResourceMetricName(RESOURCE_TYPE) + " " + THRESHOLD + " " + VALUE + + " " + uut.getNestedSummaryList() + " " + META_DATA; Assert.assertEquals(expected, uut.toString()); } @@ -110,12 +109,12 @@ public void testGetSqlSchema() { List> schema = uut.getSqlSchema(); Assert.assertEquals(9, schema.size()); Assert.assertEquals(HotResourceSummary.ResourceSummaryField.RESOURCE_TYPE_FIELD.getField(), schema.get(0)); - Assert.assertEquals(HotResourceSummary.ResourceSummaryField.THRESHOLD_FIELD.getField(), schema.get(1)); - Assert.assertEquals(HotResourceSummary.ResourceSummaryField.VALUE_FIELD.getField(), schema.get(2)); - Assert.assertEquals(HotResourceSummary.ResourceSummaryField.AVG_VALUE_FIELD.getField(), schema.get(3)); - Assert.assertEquals(HotResourceSummary.ResourceSummaryField.MIN_VALUE_FIELD.getField(), schema.get(4)); - Assert.assertEquals(HotResourceSummary.ResourceSummaryField.MAX_VALUE_FIELD.getField(), schema.get(5)); - Assert.assertEquals(HotResourceSummary.ResourceSummaryField.UNIT_TYPE_FIELD.getField(), schema.get(6)); + Assert.assertEquals(HotResourceSummary.ResourceSummaryField.RESOURCE_METRIC_FIELD.getField(), schema.get(1)); + Assert.assertEquals(HotResourceSummary.ResourceSummaryField.THRESHOLD_FIELD.getField(), schema.get(2)); + Assert.assertEquals(HotResourceSummary.ResourceSummaryField.VALUE_FIELD.getField(), schema.get(3)); + Assert.assertEquals(HotResourceSummary.ResourceSummaryField.AVG_VALUE_FIELD.getField(), schema.get(4)); + Assert.assertEquals(HotResourceSummary.ResourceSummaryField.MIN_VALUE_FIELD.getField(), schema.get(5)); + Assert.assertEquals(HotResourceSummary.ResourceSummaryField.MAX_VALUE_FIELD.getField(), schema.get(6)); Assert.assertEquals(HotResourceSummary.ResourceSummaryField.TIME_PERIOD_FIELD.getField(), schema.get(7)); } @@ -123,13 +122,13 @@ public void testGetSqlSchema() { public void testGetSqlValue() { List values = uut.getSqlValue(); Assert.assertEquals(9, values.size()); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(RESOURCE_TYPE), values.get(0)); - Assert.assertEquals(THRESHOLD, values.get(1)); - Assert.assertEquals(VALUE, values.get(2)); - Assert.assertEquals(AVG_VALUE, values.get(3)); - Assert.assertEquals(MIN_VALUE, values.get(4)); - Assert.assertEquals(MAX_VALUE, values.get(5)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeUnit(RESOURCE_TYPE), values.get(6)); + Assert.assertEquals(RESOURCE_TYPE.getResourceEnumValue(), values.get(0)); + Assert.assertEquals(RESOURCE_TYPE.getMetricEnumValue(), values.get(1)); + Assert.assertEquals(THRESHOLD, values.get(2)); + Assert.assertEquals(VALUE, values.get(3)); + Assert.assertEquals(AVG_VALUE, values.get(4)); + Assert.assertEquals(MIN_VALUE, values.get(5)); + Assert.assertEquals(MAX_VALUE, values.get(6)); Assert.assertEquals(TIME_PERIOD, values.get(7)); Assert.assertEquals(META_DATA, values.get(8)); } @@ -139,8 +138,10 @@ public void testToJson() { JsonElement elem = uut.toJson(); Assert.assertTrue(elem.isJsonObject()); JsonObject json = ((JsonObject) elem); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(RESOURCE_TYPE), + Assert.assertEquals(ResourceUtil.getResourceTypeName(RESOURCE_TYPE), json.get(HotResourceSummary.SQL_SCHEMA_CONSTANTS.RESOURCE_TYPE_COL_NAME).getAsString()); + Assert.assertEquals(ResourceUtil.getResourceMetricName(RESOURCE_TYPE), + json.get(HotResourceSummary.SQL_SCHEMA_CONSTANTS.RESOURCE_METRIC_COL_NAME).getAsString()); Assert.assertEquals(THRESHOLD, json.get(HotResourceSummary.SQL_SCHEMA_CONSTANTS.THRESHOLD_COL_NAME).getAsDouble(), 0); Assert.assertEquals(VALUE, @@ -151,8 +152,6 @@ public void testToJson() { json.get(HotResourceSummary.SQL_SCHEMA_CONSTANTS.MIN_VALUE_COL_NAME).getAsDouble(), 0); Assert.assertEquals(MAX_VALUE, json.get(HotResourceSummary.SQL_SCHEMA_CONSTANTS.MAX_VALUE_COL_NAME).getAsDouble(), 0); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeUnit(RESOURCE_TYPE), - json.get(HotResourceSummary.SQL_SCHEMA_CONSTANTS.UNIT_TYPE_COL_NAME).getAsString()); Assert.assertEquals(TIME_PERIOD, json.get(HotResourceSummary.SQL_SCHEMA_CONSTANTS.TIME_PERIOD_COL_NAME).getAsDouble(), 0); Assert.assertEquals(META_DATA, @@ -170,8 +169,11 @@ public void testBuildSummary() { Assert.assertNull(HotResourceSummary.buildSummary(null)); Record testRecord = Mockito.mock(Record.class); Mockito.when( - testRecord.get(HotResourceSummary.ResourceSummaryField.RESOURCE_TYPE_FIELD.getField(), String.class)) - .thenReturn(ResourceTypeUtil.getResourceTypeName(RESOURCE_TYPE)); + testRecord.get(HotResourceSummary.ResourceSummaryField.RESOURCE_TYPE_FIELD.getField(), Integer.class)) + .thenReturn(RESOURCE_TYPE.getResourceEnumValue()); + Mockito.when( + testRecord.get(HotResourceSummary.ResourceSummaryField.RESOURCE_METRIC_FIELD.getField(), Integer.class)) + .thenReturn(RESOURCE_TYPE.getMetricEnumValue()); Mockito.when(testRecord.get(HotResourceSummary.ResourceSummaryField.THRESHOLD_FIELD.getField(), Double.class)) .thenReturn(THRESHOLD); Mockito.when(testRecord.get(HotResourceSummary.ResourceSummaryField.VALUE_FIELD.getField(), Double.class)) @@ -188,13 +190,13 @@ public void testBuildSummary() { Assert.assertNotNull(summary); List values = summary.getSqlValue(); Assert.assertEquals(9, values.size()); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(RESOURCE_TYPE), values.get(0)); - Assert.assertEquals(THRESHOLD, values.get(1)); - Assert.assertEquals(VALUE, values.get(2)); - Assert.assertEquals(AVG_VALUE, values.get(3)); - Assert.assertEquals(MIN_VALUE, values.get(4)); - Assert.assertEquals(MAX_VALUE, values.get(5)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeUnit(RESOURCE_TYPE), values.get(6)); + Assert.assertEquals(RESOURCE_TYPE.getResourceEnumValue(), values.get(0)); + Assert.assertEquals(RESOURCE_TYPE.getMetricEnumValue(), values.get(1)); + Assert.assertEquals(THRESHOLD, values.get(2)); + Assert.assertEquals(VALUE, values.get(3)); + Assert.assertEquals(AVG_VALUE, values.get(4)); + Assert.assertEquals(MIN_VALUE, values.get(5)); + Assert.assertEquals(MAX_VALUE, values.get(6)); Assert.assertEquals(TIME_PERIOD, values.get(7)); } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotShardSummaryTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotShardSummaryTest.java index 94ce9d068..6490595d3 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotShardSummaryTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/HotShardSummaryTest.java @@ -17,8 +17,6 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.FlowUnitMessage; import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HotShardSummaryMessage; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.google.gson.JsonElement; import com.google.gson.JsonObject; diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceTypeUtilTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceTypeUtilTest.java deleted file mode 100644 index 219d1b439..000000000 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceTypeUtilTest.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries; - -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HardwareEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -public class ResourceTypeUtilTest { - private static ResourceType jvmOldGen; - private static ResourceType jvmYoungGen; - private static ResourceType hardwareCpu; - private static ResourceType unrecognized; - - @BeforeClass - public static void setup() { - jvmOldGen = ResourceType.newBuilder().setJVM(JvmEnum.OLD_GEN).build(); - jvmYoungGen = ResourceType.newBuilder().setJVM(JvmEnum.YOUNG_GEN).build(); - hardwareCpu = ResourceType.newBuilder().setHardwareResourceType(HardwareEnum.CPU).build(); - unrecognized = ResourceType.newBuilder().build(); - } - - @Test - public void testGetResourceTypeName() { - Assert.assertEquals(ResourceTypeUtil.UNKNOWN_RESOURCE_TYPE_NAME, - ResourceTypeUtil.getResourceTypeName(unrecognized)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeOptions(JvmEnum.OLD_GEN).getResourceTypeName(), - ResourceTypeUtil.getResourceTypeName(jvmOldGen)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeOptions(JvmEnum.YOUNG_GEN).getResourceTypeName(), - ResourceTypeUtil.getResourceTypeName(jvmYoungGen)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeOptions(HardwareEnum.CPU).getResourceTypeName(), - ResourceTypeUtil.getResourceTypeName(hardwareCpu)); - } - - @Test - public void testGetResourceTypeUnit() { - Assert.assertEquals(ResourceTypeUtil.UNKNOWN_RESOURCE_TYPE_UNIT, - ResourceTypeUtil.getResourceTypeUnit(unrecognized)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeOptions(JvmEnum.OLD_GEN).getResourceTypeUnit(), - ResourceTypeUtil.getResourceTypeUnit(jvmOldGen)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeOptions(JvmEnum.YOUNG_GEN).getResourceTypeUnit(), - ResourceTypeUtil.getResourceTypeUnit(jvmYoungGen)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeOptions(HardwareEnum.CPU).getResourceTypeUnit(), - ResourceTypeUtil.getResourceTypeUnit(hardwareCpu)); - } - - @Test - public void testBuildResourceType() { - Assert.assertNull(ResourceTypeUtil.buildResourceType(null)); - Assert.assertNull(ResourceTypeUtil.buildResourceType("unrecognized")); - ResourceType oldGen = ResourceTypeUtil.buildResourceType( - ResourceTypeUtil.getResourceTypeOptions(JvmEnum.OLD_GEN).getResourceTypeName()); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(jvmOldGen), - ResourceTypeUtil.getResourceTypeName(oldGen)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeUnit(jvmOldGen), - ResourceTypeUtil.getResourceTypeUnit(oldGen)); - ResourceType youngGen = ResourceTypeUtil.buildResourceType( - ResourceTypeUtil.getResourceTypeOptions(JvmEnum.YOUNG_GEN).getResourceTypeName()); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(jvmYoungGen), - ResourceTypeUtil.getResourceTypeName(youngGen)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeUnit(jvmYoungGen), - ResourceTypeUtil.getResourceTypeUnit(youngGen)); - ResourceType cpu = ResourceTypeUtil.buildResourceType( - ResourceTypeUtil.getResourceTypeOptions(HardwareEnum.CPU).getResourceTypeName()); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(hardwareCpu), - ResourceTypeUtil.getResourceTypeName(cpu)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeUnit(hardwareCpu), - ResourceTypeUtil.getResourceTypeUnit(cpu)); - } -} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtilTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtilTest.java new file mode 100644 index 000000000..f1a1efff5 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtilTest.java @@ -0,0 +1,30 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; +import org.junit.Assert; +import org.junit.Test; + +public class ResourceUtilTest { + + @Test + public void testGetResourceTypeName() { + Assert.assertEquals("old gen", ResourceUtil.getResourceTypeName(ResourceUtil.OLD_GEN_HEAP_USAGE)); + Assert.assertEquals("cpu usage", ResourceUtil.getResourceTypeName(ResourceUtil.CPU_USAGE)); + } + + @Test + public void testGetResourceTypeUnit() { + Assert.assertEquals("heap usage(memory usage in percentage)", + ResourceUtil.getResourceMetricName(ResourceUtil.OLD_GEN_HEAP_USAGE)); + Assert.assertEquals("cpu usage(num of cores)", + ResourceUtil.getResourceMetricName(ResourceUtil.CPU_USAGE)); + } + + @Test + public void testBuildResourceType() { + Resource oldGen = ResourceUtil.buildResource(0, 0); + Assert.assertEquals(oldGen, ResourceUtil.OLD_GEN_HEAP_USAGE); + Resource cpuResource = ResourceUtil.buildResource(2, 3); + Assert.assertEquals(cpuResource, ResourceUtil.CPU_USAGE); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistFlowUnitAndSummaryTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistFlowUnitAndSummaryTest.java index 60373ef9c..64e333f26 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistFlowUnitAndSummaryTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/PersistFlowUnitAndSummaryTest.java @@ -16,8 +16,6 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.persistence; import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerThreads; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.NodeRole; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; @@ -31,6 +29,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Queryable; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.RcaConf; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.ThresholdMain; @@ -69,7 +68,7 @@ public DummyYoungGenRca(M metric) { public ResourceFlowUnit operate() { ResourceContext context = new ResourceContext(Resources.State.UNHEALTHY); HotResourceSummary summary = new HotResourceSummary( - ResourceType.newBuilder().setJVM(JvmEnum.YOUNG_GEN).build(), + ResourceUtil.YOUNG_GEN_PROMOTION_RATE, 400, 100, 60); return new ResourceFlowUnit<>(System.currentTimeMillis(), context, summary); } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistorTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistorTest.java index 973a3179e..529e8da41 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistorTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/persistence/SQLitePersistorTest.java @@ -15,14 +15,13 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.persistence; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.RcaTestHelper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.Node; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; import java.io.IOException; @@ -89,7 +88,7 @@ public void write() throws IOException, SQLException, InterruptedException { ResourceContext context = new ResourceContext(Resources.State.UNHEALTHY); HotResourceSummary summary = new HotResourceSummary( - ResourceType.newBuilder().setJVM(JvmEnum.OLD_GEN).build(), + ResourceUtil.OLD_GEN_HEAP_USAGE, 70, 71, 60); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ClusterDetailsEventProcessorTestHelper.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ClusterDetailsEventProcessorTestHelper.java index 42fb109c3..2b732f199 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ClusterDetailsEventProcessorTestHelper.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ClusterDetailsEventProcessorTestHelper.java @@ -16,6 +16,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.NodeRole; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event; import java.sql.SQLException; @@ -34,6 +35,10 @@ public void addNodeDetails(String nodeId, String address, boolean isMasterNode) nodeDetails.add(createNodeDetailsMetrics(nodeId, address, isMasterNode)); } + public void addNodeDetails(String nodeId, String address, NodeRole nodeRole, boolean isMasterNode) { + nodeDetails.add(createNodeDetailsMetrics(nodeId, address, nodeRole, isMasterNode)); + } + public static ClusterDetailsEventProcessor.NodeDetails newNodeDetails(final String nodeId, final String address, final boolean isMasterNode) { return createNodeDetails(nodeId, address, isMasterNode); diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/HighHeapUsageClusterRcaTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/HighHeapUsageClusterRcaTest.java index 8318c024e..f365f8ea6 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/HighHeapUsageClusterRcaTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/HighHeapUsageClusterRcaTest.java @@ -15,11 +15,10 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.store.rca; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.RcaTestHelper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources.State; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.HighHeapUsageClusterRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; import org.junit.Assert; @@ -47,44 +46,40 @@ public void testOperate() { // send three young gen flowunits (healthy, unhealthy, unhealthy) to node1 // the cluterRca will generate three healthy flowunits - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.YOUNG_GEN), "node1", State.HEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.YOUNG_GEN_PROMOTION_RATE, "node1", State.HEALTHY)); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.YOUNG_GEN), "node1", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.YOUNG_GEN_PROMOTION_RATE, "node1", State.UNHEALTHY)); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.YOUNG_GEN), "node1", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.YOUNG_GEN_PROMOTION_RATE, "node1", State.UNHEALTHY)); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); // send two young gen flowunits (unhealthy, unhealthy) to node2 // the cluterRca will continue generating healthy flowunits - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.YOUNG_GEN), "node2", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.YOUNG_GEN_PROMOTION_RATE, "node2", State.UNHEALTHY)); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.YOUNG_GEN), "node2", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.YOUNG_GEN_PROMOTION_RATE, "node2", State.UNHEALTHY)); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); // send two old gen flowunits (unhealthy, unhealthy) to node1 // the cluterRca will continue generating healthy flowunits - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), "node1", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, "node1", State.UNHEALTHY)); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), "node1", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, "node1", State.UNHEALTHY)); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); // send one old gen flowunits (unhealthy) to node1 // the cluterRca will generate a unhealthy flowunit at the end - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), "node1", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, "node1", State.UNHEALTHY)); Assert.assertTrue(clusterRca.operate().getResourceContext().isUnhealthy()); // send one young gen flowunits (unhealthy) to node1 // flowunit becomes healthy - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.YOUNG_GEN), "node1", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.YOUNG_GEN_PROMOTION_RATE, "node1", State.UNHEALTHY)); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); // send one old gen flowunits (unhealthy) to node2 // the cluterRca will generate a unhealthy flowunit at the end - nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(buildResourceType(JvmEnum.YOUNG_GEN), "node2", State.UNHEALTHY)); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(ResourceUtil.YOUNG_GEN_PROMOTION_RATE, "node2", State.UNHEALTHY)); Assert.assertTrue(clusterRca.operate().getResourceContext().isUnhealthy()); } - - private ResourceType buildResourceType(JvmEnum jvmEnum) { - return ResourceType.newBuilder().setJVM(jvmEnum).build(); - } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/HotNodeClusterRcaTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/HotNodeClusterRcaTest.java index 9cf00e9c9..a75fe38e5 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/HotNodeClusterRcaTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/HotNodeClusterRcaTest.java @@ -18,8 +18,7 @@ import static java.time.Instant.ofEpochMilli; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.JvmEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.RcaTestHelper; @@ -29,6 +28,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.HotNodeClusterRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; import java.sql.SQLException; @@ -61,16 +61,16 @@ public void testNodeCntThresholdAndTimestampExpiration() { Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault()); clusterRca.setClock(constantClock); - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 2, "node1")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 2, "node1")); // did not collect enough nodes Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); clusterRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(6))); - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 8, "node2")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 8, "node2")); // first node expires Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 2, "node1")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 2, "node1")); Assert.assertTrue(clusterRca.operate().getResourceContext().isUnhealthy()); } @@ -81,15 +81,15 @@ public void testCaptureHotNode() { HotNodeClusterRcaX clusterRca = new HotNodeClusterRcaX(1, nodeRca); //medium = 5, below the 30% threshold - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 4, "node1")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 4, "node1")); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 5, "node2")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 5, "node2")); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 6, "node3")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 6, "node3")); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); // 10 is above 5*1.3 - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 10, "node1")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 10, "node1")); fu = clusterRca.operate(); Assert.assertTrue(fu.getResourceContext().isUnhealthy()); Assert.assertTrue(fu.hasResourceSummary()); @@ -102,7 +102,7 @@ public void testCaptureHotNode() { Assert.assertTrue(nodeSummary.getNestedSummaryList().size() > 0); HotResourceSummary resourceSummary = (HotResourceSummary) nodeSummary.getNestedSummaryList().get(0); - Assert.assertTrue(resourceSummary.getResourceType().equals(buildResourceType(JvmEnum.OLD_GEN))); + Assert.assertTrue(resourceSummary.getResource().equals(ResourceUtil.OLD_GEN_HEAP_USAGE)); Assert.assertEquals(resourceSummary.getValue(), 10, 0.1); } @@ -113,11 +113,11 @@ public void testFilterNoiseData() { HotNodeClusterRcaX clusterRca = new HotNodeClusterRcaX(1, nodeRca); //medium = 0.2, 0.8 is above the 30% threshold. but since the data is too small, we will drop it - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 0.1, "node1")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 0.1, "node1")); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 0.2, "node2")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 0.2, "node2")); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); - nodeRca.mockFlowUnit(generateFlowUnit(buildResourceType(JvmEnum.OLD_GEN), 0.8, "node3")); + nodeRca.mockFlowUnit(generateFlowUnit(ResourceUtil.OLD_GEN_HEAP_USAGE, 0.8, "node3")); Assert.assertFalse(clusterRca.operate().getResourceContext().isUnhealthy()); } @@ -132,15 +132,11 @@ public void setClock(Clock testClock) { } } - private ResourceFlowUnit generateFlowUnit(ResourceType type, double val, String nodeId) { + private ResourceFlowUnit generateFlowUnit(Resource type, double val, String nodeId) { HotResourceSummary resourceSummary = new HotResourceSummary(type, 10, val, 60); HotNodeSummary nodeSummary = new HotNodeSummary(nodeId, "127.0.0.0"); nodeSummary.appendNestedSummary(resourceSummary); return new ResourceFlowUnit(System.currentTimeMillis(), new ResourceContext(Resources.State.HEALTHY), nodeSummary); } - - private ResourceType buildResourceType(JvmEnum jvmEnum) { - return ResourceType.newBuilder().setJVM(jvmEnum).build(); - } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/cluster/BaseClusterRcaTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/cluster/BaseClusterRcaTest.java new file mode 100644 index 000000000..4d0da32c3 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/cluster/BaseClusterRcaTest.java @@ -0,0 +1,294 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.store.rca.cluster; + +import static java.time.Instant.ofEpochMilli; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.NodeRole; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.RcaTestHelper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.BaseClusterRca; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; +import java.sql.SQLException; +import java.time.Clock; +import java.time.Duration; +import java.time.ZoneId; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(GradleTaskForRca.class) +public class BaseClusterRcaTest { + private BaseClusterRca clusterRca; + private RcaTestHelper nodeRca; + private RcaTestHelper nodeRca2; + private Resource type1; + private Resource type2; + private Resource invalidType; + + @Before + public void setupCluster() throws SQLException, ClassNotFoundException { + ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper(); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node1", "127.0.0.0", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node2", "127.0.0.1", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node3", "127.0.0.2", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("master", "127.0.0.9", NodeRole.ELECTED_MASTER, true); + clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent(); + } + + @Before + public void init() { + nodeRca = new RcaTestHelper<>("RCA1"); + nodeRca2 = new RcaTestHelper<>("RCA2"); + invalidType = ResourceUtil.OLD_GEN_HEAP_USAGE; + clusterRca = new BaseClusterRca(1, nodeRca, nodeRca2); + type1 = ResourceUtil.OLD_GEN_HEAP_USAGE; + type2 = ResourceUtil.CPU_USAGE; + } + + @Test + public void testUnhealthyFlowunit() throws ClassCastException { + ResourceFlowUnit flowUnit; + nodeRca.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.UNHEALTHY), + RcaTestHelper.generateFlowUnit(type2, "node2", "127.0.0.1", Resources.State.HEALTHY) + ); + + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + HotClusterSummary clusterSummary = flowUnit.getSummary(); + Assert.assertEquals(1, clusterSummary.getNumOfUnhealthyNodes()); + Assert.assertTrue(compareNodeSummary("node1", type1, clusterSummary.getHotNodeSummaryList().get(0))); + + nodeRca.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.HEALTHY), + RcaTestHelper.generateFlowUnit(type2, "node2", "127.0.0.1", Resources.State.HEALTHY) + ); + + flowUnit = clusterRca.operate(); + Assert.assertFalse(flowUnit.getResourceContext().isUnhealthy()); + + nodeRca.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.UNHEALTHY), + RcaTestHelper.generateFlowUnit(type2, "node2", "127.0.0.1", Resources.State.UNHEALTHY) + ); + + flowUnit = clusterRca.operate(); + + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + clusterSummary = flowUnit.getSummary(); + Assert.assertEquals(2, clusterSummary.getNumOfUnhealthyNodes()); + if (compareNodeSummary("node1", type1, clusterSummary.getHotNodeSummaryList().get(0))) { + Assert.assertTrue(compareNodeSummary("node2", type2, clusterSummary.getHotNodeSummaryList().get(1))); + } + else { + Assert.assertTrue(compareNodeSummary("node1", type1, clusterSummary.getHotNodeSummaryList().get(1))); + Assert.assertTrue(compareNodeSummary("node1", type1, clusterSummary.getHotNodeSummaryList().get(0))); + } + } + + @Test + public void testMultipleRcas() throws ClassCastException { + ResourceFlowUnit flowUnit; + nodeRca.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.UNHEALTHY), + RcaTestHelper.generateFlowUnit(type1, "node2", "127.0.0.1", Resources.State.HEALTHY) + ); + + nodeRca2.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type2, "node1", "127.0.0.0", Resources.State.HEALTHY), + RcaTestHelper.generateFlowUnit(type2, "node2", "127.0.0.1", Resources.State.HEALTHY) + ); + + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + HotClusterSummary clusterSummary = flowUnit.getSummary(); + Assert.assertEquals(1, clusterSummary.getNumOfUnhealthyNodes()); + Assert.assertTrue(compareNodeSummary("node1", type1, clusterSummary.getHotNodeSummaryList().get(0))); + + nodeRca.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.UNHEALTHY), + RcaTestHelper.generateFlowUnit(type1, "node2", "127.0.0.1", Resources.State.HEALTHY) + ); + + nodeRca2.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type2, "node1", "127.0.0.0", Resources.State.HEALTHY), + RcaTestHelper.generateFlowUnit(type2, "node2", "127.0.0.1", Resources.State.UNHEALTHY) + ); + + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + clusterSummary = flowUnit.getSummary(); + Assert.assertEquals(2, clusterSummary.getNumOfUnhealthyNodes()); + Assert.assertTrue(compareNodeSummary("node1", type1, clusterSummary.getHotNodeSummaryList().get(0))); + Assert.assertTrue(compareNodeSummary("node2", type2, clusterSummary.getHotNodeSummaryList().get(1))); + + nodeRca.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.HEALTHY), + RcaTestHelper.generateFlowUnit(type1, "node2", "127.0.0.1", Resources.State.HEALTHY) + ); + + nodeRca2.mockFlowUnit( + RcaTestHelper.generateFlowUnit(type2, "node1", "127.0.0.0", Resources.State.HEALTHY), + RcaTestHelper.generateFlowUnit(type2, "node2", "127.0.0.1", Resources.State.UNHEALTHY) + ); + + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + clusterSummary = flowUnit.getSummary(); + Assert.assertEquals(1, clusterSummary.getNumOfUnhealthyNodes()); + Assert.assertTrue(compareNodeSummary("node2", type2, clusterSummary.getHotNodeSummaryList().get(0))); + } + + @Test + public void testTableEntryExpire() { + Clock constantClock = Clock.fixed(ofEpochMilli(0), ZoneId.systemDefault()); + ResourceFlowUnit flowUnit; + + clusterRca.setClock(constantClock); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.UNHEALTHY, 0)); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + Assert.assertEquals(1, flowUnit.getSummary().getNumOfUnhealthyNodes()); + + clusterRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(3))); + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type2, "node2", "127.0.0.1", Resources.State.UNHEALTHY, + TimeUnit.MINUTES.toMillis(3))); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + Assert.assertEquals(2, flowUnit.getSummary().getNumOfUnhealthyNodes()); + + clusterRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(11))); + nodeRca.mockFlowUnit(); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + Assert.assertEquals(1, flowUnit.getSummary().getNumOfUnhealthyNodes()); + + clusterRca.setClock(Clock.offset(constantClock, Duration.ofMinutes(14))); + nodeRca.mockFlowUnit(); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isHealthy()); + } + + @Test + public void testCollectFromMasterNode() { + ResourceFlowUnit flowUnit; + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type1, "master", "127.0.0.9", Resources.State.UNHEALTHY)); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isHealthy()); + + clusterRca.setCollectFromMasterNode(true); + nodeRca.mockFlowUnit(); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isHealthy()); + + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type1, "master", "127.0.0.9", Resources.State.UNHEALTHY)); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + Assert.assertEquals(1, flowUnit.getSummary().getNumOfUnhealthyNodes()); + Assert.assertEquals(4, flowUnit.getSummary().getNumOfNodes()); + Assert.assertTrue(compareNodeSummary("master", type1, flowUnit.getSummary().getHotNodeSummaryList().get(0))); + } + + @Test + public void testRemoveNodeFromCluster() throws SQLException, ClassNotFoundException { + ResourceFlowUnit flowUnit; + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.UNHEALTHY)); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type2, "node2", "127.0.0.1", Resources.State.UNHEALTHY)); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + Assert.assertEquals(2, flowUnit.getSummary().getNumOfUnhealthyNodes()); + + removeNodeFromCluster(); + + nodeRca.mockFlowUnit(); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + Assert.assertEquals(1, flowUnit.getSummary().getNumOfUnhealthyNodes()); + Assert.assertTrue(compareNodeSummary("node2", type2, flowUnit.getSummary().getHotNodeSummaryList().get(0))); + } + + @Test + public void testAddNewNodeIntoCluster() throws SQLException, ClassNotFoundException { + ResourceFlowUnit flowUnit; + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type1, "node1", "127.0.0.0", Resources.State.UNHEALTHY)); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type2, "node4", "127.0.0.3", Resources.State.UNHEALTHY)); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + Assert.assertEquals(1, flowUnit.getSummary().getNumOfUnhealthyNodes()); + Assert.assertTrue(compareNodeSummary("node1", type1, flowUnit.getSummary().getHotNodeSummaryList().get(0))); + + addNewNodeIntoCluster(); + + nodeRca.mockFlowUnit(); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + Assert.assertEquals(1, flowUnit.getSummary().getNumOfUnhealthyNodes()); + Assert.assertTrue(compareNodeSummary("node1", type1, flowUnit.getSummary().getHotNodeSummaryList().get(0))); + + nodeRca.mockFlowUnit(RcaTestHelper.generateFlowUnit(type2, "node4", "127.0.0.3",Resources.State.UNHEALTHY)); + flowUnit = clusterRca.operate(); + Assert.assertTrue(flowUnit.getResourceContext().isUnhealthy()); + HotClusterSummary clusterSummary = flowUnit.getSummary(); + Assert.assertEquals(2, clusterSummary.getNumOfUnhealthyNodes()); + Assert.assertTrue(compareNodeSummary("node1", type1, clusterSummary.getHotNodeSummaryList().get(0))); + Assert.assertTrue(compareNodeSummary("node4", type2, clusterSummary.getHotNodeSummaryList().get(1))); + } + + private void removeNodeFromCluster() throws SQLException, ClassNotFoundException { + ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper(); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node2", "127.0.0.1", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node3", "127.0.0.2", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("master", "127.0.0.9", NodeRole.ELECTED_MASTER, true); + clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent(); + } + + private void addNewNodeIntoCluster() throws SQLException, ClassNotFoundException { + ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper = new ClusterDetailsEventProcessorTestHelper(); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node1", "127.0.0.0", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node2", "127.0.0.1", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node3", "127.0.0.2", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("node4", "127.0.0.3", false); + clusterDetailsEventProcessorTestHelper.addNodeDetails("master", "127.0.0.9", NodeRole.ELECTED_MASTER, true); + clusterDetailsEventProcessorTestHelper.generateClusterDetailsEvent(); + } + + private boolean compareResourceSummary(Resource resource, HotResourceSummary resourceSummary) { + return resourceSummary.getResource().equals(resource); + } + + private boolean compareNodeSummary(String nodeId, Resource resource, HotNodeSummary nodeSummary) { + if (!nodeId.equals(nodeSummary.getNodeID()) || nodeSummary.getHotResourceSummaryList().isEmpty()) { + return false; + } + return compareResourceSummary(resource, nodeSummary.getHotResourceSummaryList().get(0)); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/hotshard/HotShardClusterRcaTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/hotshard/HotShardClusterRcaTest.java index f9e41a3e1..afbb1c0ea 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/hotshard/HotShardClusterRcaTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/hotshard/HotShardClusterRcaTest.java @@ -15,14 +15,12 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.store.rca.hotshard; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.HardwareEnum; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.RcaTestHelper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Resources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceTypeUtil; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.hotshard.HotShardClusterRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; @@ -45,13 +43,6 @@ public class HotShardClusterRcaTest { private ClusterDetailsEventProcessorTestHelper clusterDetailsEventProcessorTestHelper; - private ResourceType cpuResourceType = ResourceType.newBuilder().setHardwareResourceTypeValue( - HardwareEnum.CPU_VALUE).build(); - private ResourceType ioTotalThroughputResourceType = ResourceType.newBuilder().setHardwareResourceTypeValue( - HardwareEnum.IO_TOTAL_THROUGHPUT_VALUE).build(); - private ResourceType ioTotalSysCallRateResourceType = ResourceType.newBuilder().setHardwareResourceTypeValue( - HardwareEnum.IO_TOTAL_SYS_CALLRATE_VALUE).build(); - private enum index { index_1, index_2 @@ -179,16 +170,16 @@ public void testOperateForHotShardonSingleDimension() { List hotShard2 = nodeSummary.getNestedSummaryList().get(1).getSqlValue(); // verify the resource type, cpu utilization value, node ID, Index Name, shard ID - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(cpuResourceType), hotShard1.get(0)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(cpuResourceType), hotShard2.get(0)); + Assert.assertEquals(ResourceUtil.CPU_USAGE.getResourceEnumValue(), hotShard1.get(0)); + Assert.assertEquals(ResourceUtil.CPU_USAGE.getResourceEnumValue(), hotShard2.get(0)); - Assert.assertEquals(0.75, hotShard1.get(2)); + Assert.assertEquals(0.75, hotShard1.get(3)); String [] nodeIndexShardInfo1 = hotShard1.get(8).toString().split(" "); Assert.assertEquals(node.node_1.name(), nodeIndexShardInfo1[0]); Assert.assertEquals(index.index_1.name(), nodeIndexShardInfo1[1]); Assert.assertEquals(shard.shard_1.name(), nodeIndexShardInfo1[2]); - Assert.assertEquals(0.80, hotShard2.get(2)); + Assert.assertEquals(0.80, hotShard2.get(3)); String [] nodeIndexShardInfo2 = hotShard2.get(8).toString().split(" "); Assert.assertEquals(node.node_2.name(), nodeIndexShardInfo2[0]); Assert.assertEquals(index.index_2.name(), nodeIndexShardInfo2[1]); @@ -226,16 +217,16 @@ public void testOperateForHotShardonSingleDimension() { List hotShard4 = nodeSummary.getNestedSummaryList().get(1).getSqlValue(); // verify the resource type, IO total throughput, node ID, Index Name, shard ID - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(ioTotalThroughputResourceType), hotShard3.get(0)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(ioTotalThroughputResourceType), hotShard4.get(0)); + Assert.assertEquals(ResourceUtil.IO_TOTAL_THROUGHPUT.getResourceEnumValue(), hotShard3.get(0)); + Assert.assertEquals(ResourceUtil.IO_TOTAL_THROUGHPUT.getResourceEnumValue(), hotShard4.get(0)); - Assert.assertEquals(550000.0, hotShard3.get(2)); + Assert.assertEquals(550000.0, hotShard3.get(3)); String [] nodeIndexShardInfo3 = hotShard3.get(8).toString().split(" "); Assert.assertEquals(node.node_1.name(), nodeIndexShardInfo3[0]); Assert.assertEquals(index.index_1.name(), nodeIndexShardInfo3[1]); Assert.assertEquals(shard.shard_2.name(), nodeIndexShardInfo3[2]); - Assert.assertEquals(650000.0, hotShard4.get(2)); + Assert.assertEquals(650000.0, hotShard4.get(3)); String [] nodeIndexShardInfo4 = hotShard4.get(8).toString().split(" "); Assert.assertEquals(node.node_1.name(), nodeIndexShardInfo4[0]); Assert.assertEquals(index.index_2.name(), nodeIndexShardInfo4[1]); @@ -270,16 +261,16 @@ public void testOperateForHotShardonSingleDimension() { List hotShard6 = nodeSummary.getNestedSummaryList().get(1).getSqlValue(); // verify the resource type, IO total sys callrate, node ID, Index Name, shard ID - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(ioTotalSysCallRateResourceType), hotShard5.get(0)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(ioTotalSysCallRateResourceType), hotShard6.get(0)); + Assert.assertEquals(ResourceUtil.IO_TOTAL_SYS_CALLRATE.getResourceEnumValue(), hotShard5.get(0)); + Assert.assertEquals(ResourceUtil.IO_TOTAL_SYS_CALLRATE.getResourceEnumValue(), hotShard6.get(0)); - Assert.assertEquals(0.75, hotShard5.get(2)); + Assert.assertEquals(0.75, hotShard5.get(3)); String [] nodeIndexShardInfo5 = hotShard5.get(8).toString().split(" "); Assert.assertEquals(node.node_1.name(), nodeIndexShardInfo5[0]); Assert.assertEquals(index.index_1.name(), nodeIndexShardInfo5[1]); Assert.assertEquals(shard.shard_1.name(), nodeIndexShardInfo5[2]); - Assert.assertEquals(0.50, hotShard6.get(2)); + Assert.assertEquals(0.50, hotShard6.get(3)); String [] nodeIndexShardInfo6 = hotShard6.get(8).toString().split(" "); Assert.assertEquals(node.node_2.name(), nodeIndexShardInfo6[0]); Assert.assertEquals(index.index_2.name(), nodeIndexShardInfo6[1]); @@ -318,30 +309,30 @@ public void testOperateForHotShardonMultipleDimension() { List hotShard2 = nodeSummary.getNestedSummaryList().get(1).getSqlValue(); List hotShard3 = nodeSummary.getNestedSummaryList().get(2).getSqlValue(); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(cpuResourceType), hotShard1.get(0)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(ioTotalThroughputResourceType), hotShard2.get(0)); - Assert.assertEquals(ResourceTypeUtil.getResourceTypeName(ioTotalSysCallRateResourceType), hotShard3.get(0)); + Assert.assertEquals(ResourceUtil.CPU_USAGE.getResourceEnumValue(), hotShard1.get(0)); + Assert.assertEquals(ResourceUtil.IO_TOTAL_THROUGHPUT.getResourceEnumValue(), hotShard2.get(0)); + Assert.assertEquals(ResourceUtil.IO_TOTAL_SYS_CALLRATE.getResourceEnumValue(), hotShard3.get(0)); // verify the resource type, cpu utilization value, node ID, Index Name, shard ID - Assert.assertEquals(0.75, hotShard1.get(2)); + Assert.assertEquals(0.75, hotShard1.get(3)); String [] nodeIndexShardInfo1 = hotShard1.get(8).toString().split(" "); Assert.assertEquals(node.node_1.name(), nodeIndexShardInfo1[0]); Assert.assertEquals(index.index_1.name(), nodeIndexShardInfo1[1]); Assert.assertEquals(shard.shard_1.name(), nodeIndexShardInfo1[2]); // verify the resource type, IO total throughput, node ID, Index Name, shard ID - Assert.assertEquals(560000.0, hotShard2.get(2)); + Assert.assertEquals(560000.0, hotShard2.get(3)); String [] nodeIndexShardInfo2 = hotShard2.get(8).toString().split(" "); Assert.assertEquals(node.node_1.name(), nodeIndexShardInfo2[0]); Assert.assertEquals(index.index_1.name(), nodeIndexShardInfo2[1]); Assert.assertEquals(shard.shard_2.name(), nodeIndexShardInfo2[2]); // verify the resource type, IO total sys callrate, node ID, Index Name, shard ID - Assert.assertEquals(0.50, hotShard3.get(2)); + Assert.assertEquals(0.50, hotShard3.get(3)); String [] nodeIndexShardInfo3 = hotShard3.get(8).toString().split(" "); Assert.assertEquals(node.node_2.name(), nodeIndexShardInfo3[0]); Assert.assertEquals(index.index_2.name(), nodeIndexShardInfo3[1]); Assert.assertEquals(shard.shard_2.name(), nodeIndexShardInfo3[2]); } -} \ No newline at end of file +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/threadpool/QueueRejectionRcaTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/threadpool/QueueRejectionRcaTest.java index db89cebd8..dda9c9202 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/threadpool/QueueRejectionRcaTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/store/rca/threadpool/QueueRejectionRcaTest.java @@ -18,7 +18,6 @@ import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE; import static java.time.Instant.ofEpochMilli; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ThreadPoolEnum; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolType; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca; @@ -26,6 +25,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.MetricTestHelper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.threadpool.QueueRejectionRca; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessorTestHelper; import java.time.Clock; @@ -102,7 +102,7 @@ public void testWriteQueueOnly() { Assert.assertEquals(1, nodeSummary.getNestedSummaryList().size()); Assert.assertEquals(1, nodeSummary.getHotResourceSummaryList().size()); HotResourceSummary resourceSummary = nodeSummary.getHotResourceSummaryList().get(0); - Assert.assertEquals(ThreadPoolEnum.WRITE_QUEUE, resourceSummary.getResourceType().getThreadPool()); + Assert.assertEquals(ResourceUtil.WRITE_QUEUE_REJECTION, resourceSummary.getResource()); Assert.assertEquals(0.01, 6.0, resourceSummary.getValue()); mockFlowUnits(0, 0); @@ -141,10 +141,10 @@ public void testWriteAndSearchQueues() { Assert.assertEquals(2, nodeSummary.getNestedSummaryList().size()); Assert.assertEquals(2, nodeSummary.getHotResourceSummaryList().size()); HotResourceSummary resourceSummary = nodeSummary.getHotResourceSummaryList().get(1); - Assert.assertEquals(ThreadPoolEnum.SEARCH_QUEUE, resourceSummary.getResourceType().getThreadPool()); + Assert.assertEquals(ResourceUtil.SEARCH_QUEUE_REJECTION, resourceSummary.getResource()); Assert.assertEquals(0.01, 9.0, resourceSummary.getValue()); resourceSummary = nodeSummary.getHotResourceSummaryList().get(0); - Assert.assertEquals(ThreadPoolEnum.WRITE_QUEUE, resourceSummary.getResourceType().getThreadPool()); + Assert.assertEquals(ResourceUtil.WRITE_QUEUE_REJECTION, resourceSummary.getResource()); Assert.assertEquals(0.01, 7.0, resourceSummary.getValue()); } }