Skip to content

Commit

Permalink
Add current node weight as an APM metric (elastic#117557)
Browse files Browse the repository at this point in the history
As discussed previously, the current node weight (calculated the same way that we calculate for the desired balance computations) might also be useful to have as a metric. The difference is that the current node weight is calculated based on the current cluster state rather than the internal state of the BalancedShardsAllocator (i.e. Balancer and ModelNode). To share all the weight calculation logic I had to move out the weight function and a few related utilities. NodeAllocationStatsProvider is still shared by both the AllocationStatsService and the desired balance metric collection.

Relates ES-10080
  • Loading branch information
pxsalehi authored Nov 26, 2024
1 parent 2bc1b4f commit e9f899e
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ public void testDesiredBalanceMetrics() {
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
final var currentNodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME
);
assertThat(currentNodeWeightsMetrics.size(), equalTo(2));
for (var nodeStat : currentNodeWeightsMetrics) {
assertTrue(nodeStat.isDouble());
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
final var currentNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME
);
Expand Down Expand Up @@ -196,6 +205,7 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou
testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME),
matcher
);
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public ClusterModule(
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster);
var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster, clusterService.getClusterSettings());
this.shardsAllocator = createShardsAllocator(
settings,
clusterService.getClusterSettings(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class AllocationStatsService {
private final ClusterService clusterService;
Expand All @@ -39,6 +40,26 @@ public AllocationStatsService(
}

public Map<String, NodeAllocationStats> stats() {
return nodeAllocationStatsProvider.stats(clusterService.state(), clusterInfoService.getClusterInfo(), desiredBalanceSupplier.get());
var state = clusterService.state();
var stats = nodeAllocationStatsProvider.stats(
state.metadata(),
state.getRoutingNodes(),
clusterInfoService.getClusterInfo(),
desiredBalanceSupplier.get()
);
return stats.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> new NodeAllocationStats(
e.getValue().shards(),
e.getValue().undesiredShards(),
e.getValue().forecastedIngestLoad(),
e.getValue().forecastedDiskUsage(),
e.getValue().currentDiskUsage()
)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@
package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;

Expand All @@ -23,17 +27,47 @@
public class NodeAllocationStatsProvider {
private final WriteLoadForecaster writeLoadForecaster;

public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster) {
private volatile float indexBalanceFactor;
private volatile float shardBalanceFactor;
private volatile float writeLoadBalanceFactor;
private volatile float diskUsageBalanceFactor;

public record NodeAllocationAndClusterBalanceStats(
int shards,
int undesiredShards,
double forecastedIngestLoad,
long forecastedDiskUsage,
long currentDiskUsage,
float currentNodeWeight
) {}

public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) {
this.writeLoadForecaster = writeLoadForecaster;
clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
clusterSettings.initializeAndWatch(
BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING,
value -> this.writeLoadBalanceFactor = value
);
clusterSettings.initializeAndWatch(
BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING,
value -> this.diskUsageBalanceFactor = value
);
}

public Map<String, NodeAllocationStats> stats(
ClusterState clusterState,
public Map<String, NodeAllocationAndClusterBalanceStats> stats(
Metadata metadata,
RoutingNodes routingNodes,
ClusterInfo clusterInfo,
@Nullable DesiredBalance desiredBalance
) {
var stats = Maps.<String, NodeAllocationStats>newMapWithExpectedSize(clusterState.getRoutingNodes().size());
for (RoutingNode node : clusterState.getRoutingNodes()) {
var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor);
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);

var stats = Maps.<String, NodeAllocationAndClusterBalanceStats>newMapWithExpectedSize(routingNodes.size());
for (RoutingNode node : routingNodes) {
int shards = 0;
int undesiredShards = 0;
double forecastedWriteLoad = 0.0;
Expand All @@ -44,7 +78,7 @@ public Map<String, NodeAllocationStats> stats(
continue;
}
shards++;
IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(shardRouting.index());
IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index());
if (isDesiredAllocation(desiredBalance, shardRouting) == false) {
undesiredShards++;
}
Expand All @@ -54,14 +88,23 @@ public Map<String, NodeAllocationStats> stats(
currentDiskUsage += shardSize;

}
float currentNodeWeight = weightFunction.nodeWeight(
shards,
avgShardsPerNode,
forecastedWriteLoad,
avgWriteLoadPerNode,
currentDiskUsage,
avgDiskUsageInBytesPerNode
);
stats.put(
node.nodeId(),
new NodeAllocationStats(
new NodeAllocationAndClusterBalanceStats(
shards,
desiredBalance != null ? undesiredShards : -1,
forecastedWriteLoad,
forecastedDiskUsage,
currentDiskUsage
currentDiskUsage,
currentNodeWeight
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,17 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> nodeLevelWeights = new HashMap<>();
for (var entry : balancer.nodes.entrySet()) {
var node = entry.getValue();
var nodeWeight = weightFunction.nodeWeight(
node.numShards(),
balancer.avgShardsPerNode(),
node.writeLoad(),
balancer.avgWriteLoadPerNode(),
node.diskUsageInBytes(),
balancer.avgDiskUsageInBytesPerNode()
);
nodeLevelWeights.put(
node.routingNode.node(),
new DesiredBalanceMetrics.NodeWeightStats(
node.numShards(),
node.diskUsageInBytes(),
node.writeLoad(),
weightFunction.nodeWeight(balancer, node)
)
new DesiredBalanceMetrics.NodeWeightStats(node.numShards(), node.diskUsageInBytes(), node.writeLoad(), nodeWeight)
);
}
allocation.routingNodes().setBalanceWeightStatsPerNode(nodeLevelWeights);
Expand Down Expand Up @@ -252,65 +255,6 @@ public float getShardBalance() {
return shardBalanceFactor;
}

/**
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
* Currently this function has 3 properties:
* <ul>
* <li><code>index balance</code> - balance property over shards per index</li>
* <li><code>shard balance</code> - balance property over shards per cluster</li>
* </ul>
* <p>
* Each of these properties are expressed as factor such that the properties factor defines the relative
* importance of the property for the weight function. For example if the weight function should calculate
* the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will
* in turn have no effect on the distribution.
* </p>
* The weight per index is calculated based on the following formula:
* <ul>
* <li>
* <code>weight<sub>index</sub>(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index))</code>
* </li>
* <li>
* <code>weight<sub>node</sub>(node, index) = shardBalance * (node.numShards() - avgShardsPerNode)</code>
* </li>
* </ul>
* <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code>
*/
private static class WeightFunction {

private final float theta0;
private final float theta1;
private final float theta2;
private final float theta3;

WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) {
float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
theta2 = writeLoadBalance / sum;
theta3 = diskUsageBalance / sum;
}

float weight(Balancer balancer, ModelNode node, String index) {
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return nodeWeight(balancer, node) + theta1 * weightIndex;
}

float nodeWeight(Balancer balancer, ModelNode node) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode());
final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode());
return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage;
}

float minWeightDelta(Balancer balancer, String index) {
return theta0 * 1 + theta1 * 1 + theta2 * balancer.getShardWriteLoad(index) + theta3 * balancer.maxShardSizeBytes(index);
}
}

/**
* A {@link Balancer}
*/
Expand All @@ -335,63 +279,13 @@ private Balancer(WriteLoadForecaster writeLoadForecaster, RoutingAllocation allo
this.metadata = allocation.metadata();
this.weight = weight;
this.threshold = threshold;
avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size();
avgWriteLoadPerNode = getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size();
avgDiskUsageInBytesPerNode = ((double) getTotalDiskUsageInBytes(allocation.clusterInfo(), metadata) / routingNodes.size());
avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes);
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
sorter = newNodeSorter();
}

private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) {
double writeLoad = 0.0;
for (IndexMetadata indexMetadata : metadata.indices().values()) {
writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata);
}
return writeLoad;
}

private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) {
var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
return shardWriteLoad * numberOfCopies(indexMetadata);
}

private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) {
long totalDiskUsageInBytes = 0;
for (IndexMetadata indexMetadata : metadata.indices().values()) {
totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata);
}
return totalDiskUsageInBytes;
}

// Visible for testing
static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) {
if (indexMetadata.ignoreDiskWatermarks()) {
// disk watermarks are ignored for partial searchable snapshots
// and is equivalent to indexMetadata.isPartialSearchableSnapshot()
return 0;
}
final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L);
long totalSizeInBytes = 0;
int shardCount = 0;
for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) {
final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard);
final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L));
if (primaryShardSize != -1L) {
totalSizeInBytes += primaryShardSize;
shardCount++;
}
final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L));
if (replicaShardSize != -1L) {
totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas();
shardCount += indexMetadata.getNumberOfReplicas();
}
}
if (shardCount == numberOfCopies(indexMetadata)) {
return totalSizeInBytes;
}
return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata);
}

private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) {
if (indexMetadata.ignoreDiskWatermarks()) {
// disk watermarks are ignored for partial searchable snapshots
Expand All @@ -401,10 +295,6 @@ private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMet
return Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0L), clusterInfo.getShardSize(shardRouting, 0L));
}

private static int numberOfCopies(IndexMetadata indexMetadata) {
return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas());
}

private float getShardWriteLoad(String index) {
return (float) writeLoadForecaster.getForecastedWriteLoad(metadata.index(index)).orElse(0.0);
}
Expand Down Expand Up @@ -1433,7 +1323,7 @@ public float weight(ModelNode node) {
}

public float minWeightDelta() {
return function.minWeightDelta(balancer, index);
return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
* @param weightsPerNode The node weights calculated based on
* {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WeightFunction#nodeWeight}
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#nodeWeight}
*/
public record DesiredBalance(
long lastConvergedIndex,
Expand Down
Loading

0 comments on commit e9f899e

Please sign in to comment.