From 36c69dc57d6b945933bffe04705d21e4c44ded19 Mon Sep 17 00:00:00 2001 From: Arpita Date: Tue, 20 Oct 2020 14:40:08 +0530 Subject: [PATCH] Publish Shard State Metrics --- .../metrics/AllMetrics.java | 29 +++++++ .../metrics/PerformanceAnalyzerMetrics.java | 2 - .../model/MetricsModel.java | 7 +- .../rca/framework/metrics/WriterMetrics.java | 15 ++++ .../reader/MetricsEmitter.java | 73 +++++----------- .../reader/ShardStateMetricsProcessor.java | 22 ++++- .../reader/ShardStateMetricsSnapshot.java | 84 ++++--------------- .../reader/ShardStateMetricsSnapshotTest.java | 17 +++- 8 files changed, 122 insertions(+), 127 deletions(-) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java index 5731613c5..d942a74dc 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java @@ -1154,6 +1154,14 @@ public String toString() { } } + /* + * column names of Shard_State table + * IndexName | ShardID | ShardType | NodeName | Shard_State | sum | avg | min |max + * + *

Example: + * pmc|4|p|elasticsearch1|UNASSIGNED|1.0|1.0|1.0|1.0 + * pmc|2|p|elasticsearch2|INITIALIZING|1.0|1.0|1.0|1.0 + */ public enum ShardStateDimension implements MetricDimension { INDEX_NAME(CommonDimension.INDEX_NAME.toString()), SHARD_ID(CommonDimension.SHARD_ID.toString()), @@ -1179,6 +1187,27 @@ public static class Constants { } } + public enum ShardType { + SHARD_PRIMARY(Constants.SHARD_PRIMARY), + SHARD_REPLICA(Constants.SHARD_REPLICA); + + private final String value; + + ShardType(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + public static class Constants { + public static final String SHARD_PRIMARY = "p"; + public static final String SHARD_REPLICA = "r"; + } + } + public enum ShardStateValue implements MetricValue { SHARD_STATE(Constants.SHARD_STATE); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java index 17747fcfb..495baec8a 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java @@ -61,8 +61,6 @@ public class PerformanceAnalyzerMetrics { public static final String MASTER_CURRENT = "current"; public static final String MASTER_META_DATA = "metadata"; public static final String METRIC_CURRENT_TIME = "current_time"; - public static final String SHARD_PRIMARY = "p"; - public static final String SHARD_REPLICA = "r"; public static final int QUEUE_SIZE = PluginSettings.instance().getWriterQueueSize(); // TODO: Comeup with a more sensible number. diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java index 85b33cd99..613bbb56a 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java @@ -341,10 +341,9 @@ public class MetricsModel { MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values())); allMetricsInitializer.put( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - new MetricAttributes( - MetricUnits.COUNT.toString(), AllMetrics.ShardStateDimension.values())); - + AllMetrics.ShardStateValue.SHARD_STATE.toString(), + new MetricAttributes( + MetricUnits.COUNT.toString(), AllMetrics.ShardStateDimension.values())); ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java index 0ce9b50ed..ed7fd1ced 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java @@ -1,3 +1,18 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.eval.Statistics; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java index eeee874c0..e48b7d9fe 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java @@ -16,6 +16,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.TroubleshootingConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonMetric; @@ -32,6 +33,8 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.BatchBindStep; @@ -87,6 +90,17 @@ public class MetricsEmitter { } }; + private static final List SHARD_STATE_TABLE_DIMENSIONS = + new ArrayList() { + { + this.add(AllMetrics.ShardStateDimension.INDEX_NAME.toString()); + this.add(AllMetrics.ShardStateDimension.SHARD_ID.toString()); + this.add(AllMetrics.ShardStateDimension.SHARD_TYPE.toString()); + this.add(AllMetrics.ShardStateDimension.NODE_NAME.toString()); + this.add(AllMetrics.ShardStateDimension.SHARD_STATE.toString()); + } + }; + public static void emitAggregatedOSMetrics( final DSLContext create, final MetricsDB db, @@ -852,74 +866,33 @@ public static void emitNodeMetrics( public static void emitShardStateMetric( MetricsDB metricsDB, ShardStateMetricsSnapshot shardStateMetricsSnapshot) { long mCurrT = System.currentTimeMillis(); - Result shardStateMetrics = shardStateMetricsSnapshot.fetchAggregatedShardStateMetrics(); - List dims = - new ArrayList() { - { - this.add(AllMetrics.ShardStateDimension.INDEX_NAME.toString()); - this.add(AllMetrics.ShardStateDimension.SHARD_ID.toString()); - this.add(AllMetrics.ShardStateDimension.SHARD_TYPE.toString()); - this.add(AllMetrics.ShardStateDimension.NODE_NAME.toString()); - this.add(AllMetrics.ShardStateDimension.SHARD_STATE.toString()); - } - }; + Result shardStateMetrics = shardStateMetricsSnapshot.fetchAll(); metricsDB.createMetric( new Metric(AllMetrics.ShardStateValue.SHARD_STATE.toString(), 0d), - dims); + SHARD_STATE_TABLE_DIMENSIONS); BatchBindStep handle = metricsDB.startBatchPut( new Metric(AllMetrics.ShardStateValue.SHARD_STATE.toString(), 0d), - dims); + SHARD_STATE_TABLE_DIMENSIONS); for (Record r : shardStateMetrics) { - - Double sumShardState = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - MetricsDB.SUM)) - .toString()); - - Double avgShardState = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - MetricsDB.AVG)) - .toString()); - - Double minShardState = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - MetricsDB.MIN)) - .toString()); - - Double maxShardState = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - MetricsDB.MAX)) - .toString()); - handle.bind( r.get(AllMetrics.ShardStateDimension.INDEX_NAME.toString()).toString(), r.get(AllMetrics.ShardStateDimension.SHARD_ID.toString()).toString(), r.get(AllMetrics.ShardStateDimension.SHARD_TYPE.toString()).toString(), r.get(AllMetrics.ShardStateDimension.NODE_NAME.toString()).toString(), r.get(AllMetrics.ShardStateDimension.SHARD_STATE.toString()).toString(), - sumShardState, - avgShardState, - minShardState, - maxShardState); + 1.0, + 1.0, + 1.0, + 1.0); } handle.execute(); long mFinalT = System.currentTimeMillis(); LOG.debug( "Total time taken for writing shard state event queue metrics metricsdb: {}", mFinalT - mCurrT); + PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(ReaderMetrics.SHARD_STATE_EMITTER_EXECUTION_TIME, + "", mFinalT - mCurrT); } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsProcessor.java index c2dd53241..a20146687 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsProcessor.java @@ -1,3 +1,18 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; @@ -23,6 +38,8 @@ public class ShardStateMetricsProcessor implements EventProcessor { private BatchBindStep handle; private static final ObjectMapper MAPPER = new ObjectMapper(); private static final TypeReference> TYPE_REF = new TypeReference>() {}; + private long startTime; + private long endTime; private ShardStateMetricsProcessor(ShardStateMetricsSnapshot snapshot) { this.shardStateMetricsSnapshot = snapshot; @@ -31,7 +48,8 @@ private ShardStateMetricsProcessor(ShardStateMetricsSnapshot snapshot) { static ShardStateMetricsProcessor buildShardStateMetricEventsProcessor( long currWindowStartTime, Connection conn, - NavigableMap shardStateEventMetricsMap) { + NavigableMap shardStateEventMetricsMap) { ShardStateMetricsSnapshot shardStateSnap = shardStateEventMetricsMap.get(currWindowStartTime); if (shardStateSnap == null) { shardStateSnap = new ShardStateMetricsSnapshot(conn, currWindowStartTime); @@ -43,6 +61,8 @@ static ShardStateMetricsProcessor buildShardStateMetricEventsProcessor( @Override public void initializeProcessing(long startTime, long endTime) { this.handle = shardStateMetricsSnapshot.startBatchPut(); + this.startTime = startTime; + this.endTime = endTime; } @Override diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshot.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshot.java index 509a19a4d..6772be475 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshot.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshot.java @@ -1,8 +1,21 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; import com.google.common.annotations.VisibleForTesting; import java.sql.Connection; @@ -26,7 +39,6 @@ public class ShardStateMetricsSnapshot implements Removable { private static final Logger LOG = LogManager.getLogger(ShardStateMetricsSnapshot.class); private final DSLContext create; private final String tableName; - private static final Long EXPIRE_AFTER = 1200000L; private static final List> columns = new ArrayList>() { { @@ -89,70 +101,4 @@ public void putMetrics(String shard_state, Map dimensions) { .set(dimensionMap) .execute(); } - - /** This method returns the aggregated ShardState metrics with Shard State column as the value and dummy values - * "1.0" in aggreagted columns[sum, avg, min and max] - * @return Result of records. - */ - public Result fetchAggregatedShardStateMetrics() { - List> fields = new ArrayList>() { - { - this.add( - DSL.field( - DSL.name(AllMetrics.ShardStateDimension.INDEX_NAME.toString()), - String.class) - ); - this.add( - DSL.field( - DSL.name(AllMetrics.ShardStateDimension.SHARD_ID.toString()), - String.class) - ); - this.add( - DSL.field( - DSL.name(AllMetrics.ShardStateDimension.SHARD_TYPE.toString()), - String.class) - ); - this.add( - DSL.field( - DSL.name(AllMetrics.ShardStateDimension.NODE_NAME.toString()), - String.class) - ); - this.add( - DSL.field( - DSL.name(AllMetrics.ShardStateDimension.SHARD_STATE.toString()), - String.class) - ); - this.add( - DSL.val(1.0) - .as( - DBUtils.getAggFieldName( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - MetricsDB.SUM)) - ); - this.add( - DSL.val(1.0) - .as( - DBUtils.getAggFieldName( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - MetricsDB.AVG)) - ); - this.add( - DSL.val(1.0) - .as( - DBUtils.getAggFieldName( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - MetricsDB.MIN)) - ); - this.add( - DSL.val(1.0) - .as( - DBUtils.getAggFieldName( - AllMetrics.ShardStateValue.SHARD_STATE.toString(), - MetricsDB.MAX)) - ); - } - }; - return create.select(fields).from(DSL.table(this.tableName)).fetch(); - } - } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshotTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshotTest.java index b6fb73371..1f6cc1333 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshotTest.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ShardStateMetricsSnapshotTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; @@ -31,7 +46,7 @@ public void testPutMetrics() { handle.bind("indexName", "shardId", "p","nodeName","Unassigned"); handle.execute(); - Result rt = shardStateMetricsSnapshot.fetchAggregatedShardStateMetrics(); + Result rt = shardStateMetricsSnapshot.fetchAll(); assertEquals(1, rt.size()); String shard_state = rt.get(0).get(AllMetrics.ShardStateValue.SHARD_STATE.toString()).toString();