Skip to content

Commit

Permalink
Use Long instead of Double for allocation disk usage APM metrics (#11…
Browse files Browse the repository at this point in the history
…6732)

I was trying to build a dashboard on top of these metrics and came
across some zeros and negative values that I found a bit surprising.
Also by mistake some long values are exposed as Double metrics. I've
updated the metric test to make sure we have more concrete assertions.
(note that the desired balance disk usage metric is double, so I'm
keeping it as is).
  • Loading branch information
pxsalehi authored Nov 13, 2024
1 parent cdd77c6 commit f18c7c8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
Expand Down Expand Up @@ -56,8 +60,15 @@ public void testDesiredBalanceGaugeMetricsAreOnlyPublishedByCurrentMaster() thro
public void testDesiredBalanceMetrics() {
internalCluster().startNodes(2);
prepareCreate("test").setSettings(indexSettings(2, 1)).get();
indexRandom(randomBoolean(), "test", between(50, 100));
ensureGreen();

indexRandom(randomBoolean(), "test", between(50, 100));
flush("test");
// Make sure new cluster info is available
final var infoService = (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
ClusterInfoServiceUtils.setUpdateFrequency(infoService, TimeValue.timeValueMillis(200));
assertNotNull("info should not be null", ClusterInfoServiceUtils.refresh(infoService));

final var telemetryPlugin = getTelemetryPlugin(internalCluster().getMasterName());
telemetryPlugin.collect();
assertThat(telemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.UNASSIGNED_SHARDS_METRIC_NAME), not(empty()));
Expand All @@ -73,7 +84,7 @@ public void testDesiredBalanceMetrics() {
);
assertThat(desiredBalanceNodeWeightsMetrics.size(), equalTo(2));
for (var nodeStat : desiredBalanceNodeWeightsMetrics) {
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
assertTrue(nodeStat.isDouble());
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
Expand Down Expand Up @@ -122,15 +133,16 @@ public void testDesiredBalanceMetrics() {
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
final var currentNodeDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
final var currentNodeDiskUsageMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME
);
assertThat(currentNodeDiskUsageMetrics.size(), equalTo(2));
for (var nodeStat : currentNodeDiskUsageMetrics) {
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
assertThat(nodeStat.value().longValue(), greaterThanOrEqualTo(0L));
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
assertTrue(currentNodeDiskUsageMetrics.stream().anyMatch(m -> m.getLong() > 0L));
final var currentNodeUndesiredShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_UNDESIRED_SHARD_COUNT_METRIC_NAME
);
Expand All @@ -140,15 +152,16 @@ public void testDesiredBalanceMetrics() {
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
final var currentNodeForecastedDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
final var currentNodeForecastedDiskUsageMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME
);
assertThat(currentNodeForecastedDiskUsageMetrics.size(), equalTo(2));
for (var nodeStat : currentNodeForecastedDiskUsageMetrics) {
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
assertThat(nodeStat.value().longValue(), greaterThanOrEqualTo(0L));
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
assertTrue(currentNodeForecastedDiskUsageMetrics.stream().anyMatch(m -> m.getLong() > 0L));
}

private static void assertOnlyMasterIsPublishingMetrics() {
Expand Down Expand Up @@ -182,10 +195,10 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou
matcher
);
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher);
assertThat(
testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME),
testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME),
matcher
);
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public DesiredBalanceMetrics(MeterRegistry meterRegistry) {
"threads",
this::getCurrentNodeWriteLoadMetrics
);
meterRegistry.registerDoublesGauge(
meterRegistry.registerLongsGauge(
CURRENT_NODE_DISK_USAGE_METRIC_NAME,
"The current disk usage of nodes",
"bytes",
Expand All @@ -148,7 +148,7 @@ public DesiredBalanceMetrics(MeterRegistry meterRegistry) {
"unit",
this::getCurrentNodeShardCountMetrics
);
meterRegistry.registerDoublesGauge(
meterRegistry.registerLongsGauge(
CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME,
"The current forecasted disk usage of nodes",
"bytes",
Expand Down Expand Up @@ -231,16 +231,16 @@ private List<LongWithAttributes> getDesiredBalanceNodeShardCountMetrics() {
return values;
}

private List<DoubleWithAttributes> getCurrentNodeDiskUsageMetrics() {
private List<LongWithAttributes> getCurrentNodeDiskUsageMetrics() {
if (nodeIsMaster == false) {
return List.of();
}
var stats = allocationStatsPerNodeRef.get();
List<DoubleWithAttributes> doubles = new ArrayList<>(stats.size());
List<LongWithAttributes> values = new ArrayList<>(stats.size());
for (var node : stats.keySet()) {
doubles.add(new DoubleWithAttributes(stats.get(node).currentDiskUsage(), getNodeAttributes(node)));
values.add(new LongWithAttributes(stats.get(node).currentDiskUsage(), getNodeAttributes(node)));
}
return doubles;
return values;
}

private List<DoubleWithAttributes> getCurrentNodeWriteLoadMetrics() {
Expand All @@ -267,16 +267,16 @@ private List<LongWithAttributes> getCurrentNodeShardCountMetrics() {
return values;
}

private List<DoubleWithAttributes> getCurrentNodeForecastedDiskUsageMetrics() {
private List<LongWithAttributes> getCurrentNodeForecastedDiskUsageMetrics() {
if (nodeIsMaster == false) {
return List.of();
}
var stats = allocationStatsPerNodeRef.get();
List<DoubleWithAttributes> doubles = new ArrayList<>(stats.size());
List<LongWithAttributes> values = new ArrayList<>(stats.size());
for (var node : stats.keySet()) {
doubles.add(new DoubleWithAttributes(stats.get(node).forecastedDiskUsage(), getNodeAttributes(node)));
values.add(new LongWithAttributes(stats.get(node).forecastedDiskUsage(), getNodeAttributes(node)));
}
return doubles;
return values;
}

private List<LongWithAttributes> getCurrentNodeUndesiredShardCountMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.core.TimeValue;

import java.util.concurrent.TimeUnit;

Expand All @@ -37,4 +38,8 @@ protected boolean blockingAllowed() {
throw new AssertionError(e);
}
}

public static void setUpdateFrequency(InternalClusterInfoService internalClusterInfoService, TimeValue updateFrequency) {
internalClusterInfoService.setUpdateFrequency(updateFrequency);
}
}

0 comments on commit f18c7c8

Please sign in to comment.