diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java
index 5731613c5..d942a74dc 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java
@@ -1154,6 +1154,14 @@ public String toString() {
}
}
+ /*
+ * column names of Shard_State table
+ * IndexName | ShardID | ShardType | NodeName | Shard_State | sum | avg | min |max
+ *
+ *
Example:
+ * pmc|4|p|elasticsearch1|UNASSIGNED|1.0|1.0|1.0|1.0
+ * pmc|2|p|elasticsearch2|INITIALIZING|1.0|1.0|1.0|1.0
+ */
public enum ShardStateDimension implements MetricDimension {
INDEX_NAME(CommonDimension.INDEX_NAME.toString()),
SHARD_ID(CommonDimension.SHARD_ID.toString()),
@@ -1179,6 +1187,27 @@ public static class Constants {
}
}
+ public enum ShardType {
+ SHARD_PRIMARY(Constants.SHARD_PRIMARY),
+ SHARD_REPLICA(Constants.SHARD_REPLICA);
+
+ private final String value;
+
+ ShardType(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ public static class Constants {
+ public static final String SHARD_PRIMARY = "p";
+ public static final String SHARD_REPLICA = "r";
+ }
+ }
+
public enum ShardStateValue implements MetricValue {
SHARD_STATE(Constants.SHARD_STATE);
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java
index 17747fcfb..495baec8a 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java
@@ -61,8 +61,6 @@ public class PerformanceAnalyzerMetrics {
public static final String MASTER_CURRENT = "current";
public static final String MASTER_META_DATA = "metadata";
public static final String METRIC_CURRENT_TIME = "current_time";
- public static final String SHARD_PRIMARY = "p";
- public static final String SHARD_REPLICA = "r";
public static final int QUEUE_SIZE = PluginSettings.instance().getWriterQueueSize();
// TODO: Comeup with a more sensible number.
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java
index 85b33cd99..613bbb56a 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java
@@ -341,10 +341,9 @@ public class MetricsModel {
MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values()));
allMetricsInitializer.put(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- new MetricAttributes(
- MetricUnits.COUNT.toString(), AllMetrics.ShardStateDimension.values()));
-
+ AllMetrics.ShardStateValue.SHARD_STATE.toString(),
+ new MetricAttributes(
+ MetricUnits.COUNT.toString(), AllMetrics.ShardStateDimension.values()));
ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer);
}
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java
index 0ce9b50ed..ed7fd1ced 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.eval.Statistics;
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java
index eeee874c0..e48b7d9fe 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java
@@ -16,6 +16,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.TroubleshootingConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonMetric;
@@ -32,6 +33,8 @@
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.BatchBindStep;
@@ -87,6 +90,17 @@ public class MetricsEmitter {
}
};
+ private static final List SHARD_STATE_TABLE_DIMENSIONS =
+ new ArrayList() {
+ {
+ this.add(AllMetrics.ShardStateDimension.INDEX_NAME.toString());
+ this.add(AllMetrics.ShardStateDimension.SHARD_ID.toString());
+ this.add(AllMetrics.ShardStateDimension.SHARD_TYPE.toString());
+ this.add(AllMetrics.ShardStateDimension.NODE_NAME.toString());
+ this.add(AllMetrics.ShardStateDimension.SHARD_STATE.toString());
+ }
+ };
+
public static void emitAggregatedOSMetrics(
final DSLContext create,
final MetricsDB db,
@@ -852,74 +866,33 @@ public static void emitNodeMetrics(
public static void emitShardStateMetric(
MetricsDB metricsDB, ShardStateMetricsSnapshot shardStateMetricsSnapshot) {
long mCurrT = System.currentTimeMillis();
- Result shardStateMetrics = shardStateMetricsSnapshot.fetchAggregatedShardStateMetrics();
- List dims =
- new ArrayList() {
- {
- this.add(AllMetrics.ShardStateDimension.INDEX_NAME.toString());
- this.add(AllMetrics.ShardStateDimension.SHARD_ID.toString());
- this.add(AllMetrics.ShardStateDimension.SHARD_TYPE.toString());
- this.add(AllMetrics.ShardStateDimension.NODE_NAME.toString());
- this.add(AllMetrics.ShardStateDimension.SHARD_STATE.toString());
- }
- };
+ Result shardStateMetrics = shardStateMetricsSnapshot.fetchAll();
metricsDB.createMetric(
new Metric(AllMetrics.ShardStateValue.SHARD_STATE.toString(), 0d),
- dims);
+ SHARD_STATE_TABLE_DIMENSIONS);
BatchBindStep handle =
metricsDB.startBatchPut(
new Metric(AllMetrics.ShardStateValue.SHARD_STATE.toString(), 0d),
- dims);
+ SHARD_STATE_TABLE_DIMENSIONS);
for (Record r : shardStateMetrics) {
-
- Double sumShardState =
- Double.parseDouble(
- r.get(
- DBUtils.getAggFieldName(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- MetricsDB.SUM))
- .toString());
-
- Double avgShardState =
- Double.parseDouble(
- r.get(
- DBUtils.getAggFieldName(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- MetricsDB.AVG))
- .toString());
-
- Double minShardState =
- Double.parseDouble(
- r.get(
- DBUtils.getAggFieldName(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- MetricsDB.MIN))
- .toString());
-
- Double maxShardState =
- Double.parseDouble(
- r.get(
- DBUtils.getAggFieldName(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- MetricsDB.MAX))
- .toString());
-
handle.bind(
r.get(AllMetrics.ShardStateDimension.INDEX_NAME.toString()).toString(),
r.get(AllMetrics.ShardStateDimension.SHARD_ID.toString()).toString(),
r.get(AllMetrics.ShardStateDimension.SHARD_TYPE.toString()).toString(),
r.get(AllMetrics.ShardStateDimension.NODE_NAME.toString()).toString(),
r.get(AllMetrics.ShardStateDimension.SHARD_STATE.toString()).toString(),
- sumShardState,
- avgShardState,
- minShardState,
- maxShardState);
+ 1.0,
+ 1.0,
+ 1.0,
+ 1.0);
}
handle.execute();
long mFinalT = System.currentTimeMillis();
LOG.debug(
"Total time taken for writing shard state event queue metrics metricsdb: {}", mFinalT - mCurrT);
+ PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(ReaderMetrics.SHARD_STATE_EMITTER_EXECUTION_TIME,
+ "", mFinalT - mCurrT);
}
}
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsProcessor.java
index c2dd53241..a20146687 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsProcessor.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsProcessor.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
@@ -23,6 +38,8 @@ public class ShardStateMetricsProcessor implements EventProcessor {
private BatchBindStep handle;
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final TypeReference> TYPE_REF = new TypeReference>() {};
+ private long startTime;
+ private long endTime;
private ShardStateMetricsProcessor(ShardStateMetricsSnapshot snapshot) {
this.shardStateMetricsSnapshot = snapshot;
@@ -31,7 +48,8 @@ private ShardStateMetricsProcessor(ShardStateMetricsSnapshot snapshot) {
static ShardStateMetricsProcessor buildShardStateMetricEventsProcessor(
long currWindowStartTime,
Connection conn,
- NavigableMap shardStateEventMetricsMap) {
+ NavigableMap shardStateEventMetricsMap) {
ShardStateMetricsSnapshot shardStateSnap = shardStateEventMetricsMap.get(currWindowStartTime);
if (shardStateSnap == null) {
shardStateSnap = new ShardStateMetricsSnapshot(conn, currWindowStartTime);
@@ -43,6 +61,8 @@ static ShardStateMetricsProcessor buildShardStateMetricEventsProcessor(
@Override
public void initializeProcessing(long startTime, long endTime) {
this.handle = shardStateMetricsSnapshot.startBatchPut();
+ this.startTime = startTime;
+ this.endTime = endTime;
}
@Override
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshot.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshot.java
index 509a19a4d..6772be475 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshot.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshot.java
@@ -1,8 +1,21 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;
-import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
-import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.google.common.annotations.VisibleForTesting;
import java.sql.Connection;
@@ -26,7 +39,6 @@ public class ShardStateMetricsSnapshot implements Removable {
private static final Logger LOG = LogManager.getLogger(ShardStateMetricsSnapshot.class);
private final DSLContext create;
private final String tableName;
- private static final Long EXPIRE_AFTER = 1200000L;
private static final List> columns =
new ArrayList>() {
{
@@ -89,70 +101,4 @@ public void putMetrics(String shard_state, Map dimensions) {
.set(dimensionMap)
.execute();
}
-
- /** This method returns the aggregated ShardState metrics with Shard State column as the value and dummy values
- * "1.0" in aggreagted columns[sum, avg, min and max]
- * @return Result of records.
- */
- public Result fetchAggregatedShardStateMetrics() {
- List> fields = new ArrayList>() {
- {
- this.add(
- DSL.field(
- DSL.name(AllMetrics.ShardStateDimension.INDEX_NAME.toString()),
- String.class)
- );
- this.add(
- DSL.field(
- DSL.name(AllMetrics.ShardStateDimension.SHARD_ID.toString()),
- String.class)
- );
- this.add(
- DSL.field(
- DSL.name(AllMetrics.ShardStateDimension.SHARD_TYPE.toString()),
- String.class)
- );
- this.add(
- DSL.field(
- DSL.name(AllMetrics.ShardStateDimension.NODE_NAME.toString()),
- String.class)
- );
- this.add(
- DSL.field(
- DSL.name(AllMetrics.ShardStateDimension.SHARD_STATE.toString()),
- String.class)
- );
- this.add(
- DSL.val(1.0)
- .as(
- DBUtils.getAggFieldName(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- MetricsDB.SUM))
- );
- this.add(
- DSL.val(1.0)
- .as(
- DBUtils.getAggFieldName(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- MetricsDB.AVG))
- );
- this.add(
- DSL.val(1.0)
- .as(
- DBUtils.getAggFieldName(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- MetricsDB.MIN))
- );
- this.add(
- DSL.val(1.0)
- .as(
- DBUtils.getAggFieldName(
- AllMetrics.ShardStateValue.SHARD_STATE.toString(),
- MetricsDB.MAX))
- );
- }
- };
- return create.select(fields).from(DSL.table(this.tableName)).fetch();
- }
-
}
diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshotTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshotTest.java
index b6fb73371..1f6cc1333 100644
--- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshotTest.java
+++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshotTest.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
@@ -31,7 +46,7 @@ public void testPutMetrics() {
handle.bind("indexName", "shardId", "p","nodeName","Unassigned");
handle.execute();
- Result rt = shardStateMetricsSnapshot.fetchAggregatedShardStateMetrics();
+ Result rt = shardStateMetricsSnapshot.fetchAll();
assertEquals(1, rt.size());
String shard_state = rt.get(0).get(AllMetrics.ShardStateValue.SHARD_STATE.toString()).toString();