Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Publish Shard State Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Arpita committed Oct 20, 2020
1 parent d590040 commit 36c69dc
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,14 @@ public String toString() {
}
}

/*
* column names of Shard_State table
* IndexName | ShardID | ShardType | NodeName | Shard_State | sum | avg | min |max
*
* <p>Example:
* pmc|4|p|elasticsearch1|UNASSIGNED|1.0|1.0|1.0|1.0
* pmc|2|p|elasticsearch2|INITIALIZING|1.0|1.0|1.0|1.0
*/
public enum ShardStateDimension implements MetricDimension {
INDEX_NAME(CommonDimension.INDEX_NAME.toString()),
SHARD_ID(CommonDimension.SHARD_ID.toString()),
Expand All @@ -1179,6 +1187,27 @@ public static class Constants {
}
}

public enum ShardType {
SHARD_PRIMARY(Constants.SHARD_PRIMARY),
SHARD_REPLICA(Constants.SHARD_REPLICA);

private final String value;

ShardType(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}

public static class Constants {
public static final String SHARD_PRIMARY = "p";
public static final String SHARD_REPLICA = "r";
}
}

public enum ShardStateValue implements MetricValue {
SHARD_STATE(Constants.SHARD_STATE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public class PerformanceAnalyzerMetrics {
public static final String MASTER_CURRENT = "current";
public static final String MASTER_META_DATA = "metadata";
public static final String METRIC_CURRENT_TIME = "current_time";
public static final String SHARD_PRIMARY = "p";
public static final String SHARD_REPLICA = "r";
public static final int QUEUE_SIZE = PluginSettings.instance().getWriterQueueSize();

// TODO: Comeup with a more sensible number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,9 @@ public class MetricsModel {
MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values()));

allMetricsInitializer.put(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
new MetricAttributes(
MetricUnits.COUNT.toString(), AllMetrics.ShardStateDimension.values()));

AllMetrics.ShardStateValue.SHARD_STATE.toString(),
new MetricAttributes(
MetricUnits.COUNT.toString(), AllMetrics.ShardStateDimension.values()));

ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.stats.eval.Statistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.TroubleshootingConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonMetric;
Expand All @@ -32,6 +33,8 @@
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.BatchBindStep;
Expand Down Expand Up @@ -87,6 +90,17 @@ public class MetricsEmitter {
}
};

private static final List<String> SHARD_STATE_TABLE_DIMENSIONS =
new ArrayList<String>() {
{
this.add(AllMetrics.ShardStateDimension.INDEX_NAME.toString());
this.add(AllMetrics.ShardStateDimension.SHARD_ID.toString());
this.add(AllMetrics.ShardStateDimension.SHARD_TYPE.toString());
this.add(AllMetrics.ShardStateDimension.NODE_NAME.toString());
this.add(AllMetrics.ShardStateDimension.SHARD_STATE.toString());
}
};

public static void emitAggregatedOSMetrics(
final DSLContext create,
final MetricsDB db,
Expand Down Expand Up @@ -852,74 +866,33 @@ public static void emitNodeMetrics(
public static void emitShardStateMetric(
MetricsDB metricsDB, ShardStateMetricsSnapshot shardStateMetricsSnapshot) {
long mCurrT = System.currentTimeMillis();
Result<Record> shardStateMetrics = shardStateMetricsSnapshot.fetchAggregatedShardStateMetrics();
List<String> dims =
new ArrayList<String>() {
{
this.add(AllMetrics.ShardStateDimension.INDEX_NAME.toString());
this.add(AllMetrics.ShardStateDimension.SHARD_ID.toString());
this.add(AllMetrics.ShardStateDimension.SHARD_TYPE.toString());
this.add(AllMetrics.ShardStateDimension.NODE_NAME.toString());
this.add(AllMetrics.ShardStateDimension.SHARD_STATE.toString());
}
};
Result<Record> shardStateMetrics = shardStateMetricsSnapshot.fetchAll();
metricsDB.createMetric(
new Metric<Double>(AllMetrics.ShardStateValue.SHARD_STATE.toString(), 0d),
dims);
SHARD_STATE_TABLE_DIMENSIONS);

BatchBindStep handle =
metricsDB.startBatchPut(
new Metric<Double>(AllMetrics.ShardStateValue.SHARD_STATE.toString(), 0d),
dims);
SHARD_STATE_TABLE_DIMENSIONS);

for (Record r : shardStateMetrics) {

Double sumShardState =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
MetricsDB.SUM))
.toString());

Double avgShardState =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
MetricsDB.AVG))
.toString());

Double minShardState =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
MetricsDB.MIN))
.toString());

Double maxShardState =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
MetricsDB.MAX))
.toString());

handle.bind(
r.get(AllMetrics.ShardStateDimension.INDEX_NAME.toString()).toString(),
r.get(AllMetrics.ShardStateDimension.SHARD_ID.toString()).toString(),
r.get(AllMetrics.ShardStateDimension.SHARD_TYPE.toString()).toString(),
r.get(AllMetrics.ShardStateDimension.NODE_NAME.toString()).toString(),
r.get(AllMetrics.ShardStateDimension.SHARD_STATE.toString()).toString(),
sumShardState,
avgShardState,
minShardState,
maxShardState);
1.0,
1.0,
1.0,
1.0);
}
handle.execute();
long mFinalT = System.currentTimeMillis();
LOG.debug(
"Total time taken for writing shard state event queue metrics metricsdb: {}", mFinalT - mCurrT);
PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(ReaderMetrics.SHARD_STATE_EMITTER_EXECUTION_TIME,
"", mFinalT - mCurrT);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
Expand All @@ -23,6 +38,8 @@ public class ShardStateMetricsProcessor implements EventProcessor {
private BatchBindStep handle;
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final TypeReference<HashMap<String, String>> TYPE_REF = new TypeReference<HashMap<String, String>>() {};
private long startTime;
private long endTime;

private ShardStateMetricsProcessor(ShardStateMetricsSnapshot snapshot) {
this.shardStateMetricsSnapshot = snapshot;
Expand All @@ -31,7 +48,8 @@ private ShardStateMetricsProcessor(ShardStateMetricsSnapshot snapshot) {
static ShardStateMetricsProcessor buildShardStateMetricEventsProcessor(
long currWindowStartTime,
Connection conn,
NavigableMap<Long, ShardStateMetricsSnapshot> shardStateEventMetricsMap) {
NavigableMap<Long,
ShardStateMetricsSnapshot> shardStateEventMetricsMap) {
ShardStateMetricsSnapshot shardStateSnap = shardStateEventMetricsMap.get(currWindowStartTime);
if (shardStateSnap == null) {
shardStateSnap = new ShardStateMetricsSnapshot(conn, currWindowStartTime);
Expand All @@ -43,6 +61,8 @@ static ShardStateMetricsProcessor buildShardStateMetricEventsProcessor(
@Override
public void initializeProcessing(long startTime, long endTime) {
this.handle = shardStateMetricsSnapshot.startBatchPut();
this.startTime = startTime;
this.endTime = endTime;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.google.common.annotations.VisibleForTesting;

import java.sql.Connection;
Expand All @@ -26,7 +39,6 @@ public class ShardStateMetricsSnapshot implements Removable {
private static final Logger LOG = LogManager.getLogger(ShardStateMetricsSnapshot.class);
private final DSLContext create;
private final String tableName;
private static final Long EXPIRE_AFTER = 1200000L;
private static final List<Field<?>> columns =
new ArrayList<Field<?>>() {
{
Expand Down Expand Up @@ -89,70 +101,4 @@ public void putMetrics(String shard_state, Map<String, String> dimensions) {
.set(dimensionMap)
.execute();
}

/** This method returns the aggregated ShardState metrics with Shard State column as the value and dummy values
* "1.0" in aggreagted columns[sum, avg, min and max]
* @return Result of records.
*/
public Result<Record> fetchAggregatedShardStateMetrics() {
List<SelectField<?>> fields = new ArrayList<SelectField<?>>() {
{
this.add(
DSL.field(
DSL.name(AllMetrics.ShardStateDimension.INDEX_NAME.toString()),
String.class)
);
this.add(
DSL.field(
DSL.name(AllMetrics.ShardStateDimension.SHARD_ID.toString()),
String.class)
);
this.add(
DSL.field(
DSL.name(AllMetrics.ShardStateDimension.SHARD_TYPE.toString()),
String.class)
);
this.add(
DSL.field(
DSL.name(AllMetrics.ShardStateDimension.NODE_NAME.toString()),
String.class)
);
this.add(
DSL.field(
DSL.name(AllMetrics.ShardStateDimension.SHARD_STATE.toString()),
String.class)
);
this.add(
DSL.val(1.0)
.as(
DBUtils.getAggFieldName(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
MetricsDB.SUM))
);
this.add(
DSL.val(1.0)
.as(
DBUtils.getAggFieldName(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
MetricsDB.AVG))
);
this.add(
DSL.val(1.0)
.as(
DBUtils.getAggFieldName(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
MetricsDB.MIN))
);
this.add(
DSL.val(1.0)
.as(
DBUtils.getAggFieldName(
AllMetrics.ShardStateValue.SHARD_STATE.toString(),
MetricsDB.MAX))
);
}
};
return create.select(fields).from(DSL.table(this.tableName)).fetch();
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
Expand Down Expand Up @@ -31,7 +46,7 @@ public void testPutMetrics() {

handle.bind("indexName", "shardId", "p","nodeName","Unassigned");
handle.execute();
Result<Record> rt = shardStateMetricsSnapshot.fetchAggregatedShardStateMetrics();
Result<Record> rt = shardStateMetricsSnapshot.fetchAll();

assertEquals(1, rt.size());
String shard_state = rt.get(0).get(AllMetrics.ShardStateValue.SHARD_STATE.toString()).toString();
Expand Down

0 comments on commit 36c69dc

Please sign in to comment.