From e9f899ee6913fe00dc8ef7a4254c76e8dca31b47 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Tue, 26 Nov 2024 16:44:15 +0100 Subject: [PATCH] Add current node weight as an APM metric (#117557) As discussed previously, the current node weight (calculated the same way that we calculate for the desired balance computations) might also be useful to have as a metric. The difference is that the current node weight is calculated based on the current cluster state rather than the internal state of the BalancedShardsAllocator (i.e. Balancer and ModelNode). To share all the weight calculation logic I had to move out the weight function and a few related utilities. NodeAllocationStatsProvider is still shared by both the AllocationStatsService and the desired balance metric collection. Relates ES-10080 --- .../DesiredBalanceReconcilerMetricsIT.java | 10 ++ .../elasticsearch/cluster/ClusterModule.java | 2 +- .../allocation/AllocationStatsService.java | 23 ++- .../NodeAllocationStatsProvider.java | 61 ++++++- .../allocator/BalancedShardsAllocator.java | 136 ++------------- .../allocation/allocator/DesiredBalance.java | 2 +- .../allocator/DesiredBalanceMetrics.java | 26 ++- .../allocator/DesiredBalanceReconciler.java | 11 +- .../allocation/allocator/WeightFunction.java | 157 ++++++++++++++++++ .../AllocationStatsServiceTests.java | 6 +- .../BalancedShardsAllocatorTests.java | 2 +- .../cluster/ESAllocationTestCase.java | 10 +- 12 files changed, 297 insertions(+), 149 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java index b3ec4a5331180..355427c4e059b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java @@ -117,6 +117,15 @@ public void testDesiredBalanceMetrics() { assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds))); assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames))); } + final var currentNodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement( + DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME + ); + assertThat(currentNodeWeightsMetrics.size(), equalTo(2)); + for (var nodeStat : currentNodeWeightsMetrics) { + assertTrue(nodeStat.isDouble()); + assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds))); + assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames))); + } final var currentNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement( DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME ); @@ -196,6 +205,7 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME), matcher ); + assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 046f4b6b0b251..c2da33f8f4135 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -139,7 +139,7 @@ public ClusterModule( this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); this.allocationDeciders = new AllocationDeciders(deciderList); - var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster); + var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster, clusterService.getClusterSettings()); this.shardsAllocator = createShardsAllocator( settings, clusterService.getClusterSettings(), diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java index 0c82faaaeaa45..b98e9050d2b4a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.function.Supplier; +import java.util.stream.Collectors; public class AllocationStatsService { private final ClusterService clusterService; @@ -39,6 +40,26 @@ public AllocationStatsService( } public Map stats() { - return nodeAllocationStatsProvider.stats(clusterService.state(), clusterInfoService.getClusterInfo(), desiredBalanceSupplier.get()); + var state = clusterService.state(); + var stats = nodeAllocationStatsProvider.stats( + state.metadata(), + state.getRoutingNodes(), + clusterInfoService.getClusterInfo(), + desiredBalanceSupplier.get() + ); + return stats.entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> new NodeAllocationStats( + e.getValue().shards(), + e.getValue().undesiredShards(), + e.getValue().forecastedIngestLoad(), + e.getValue().forecastedDiskUsage(), + e.getValue().currentDiskUsage() + ) + ) + ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java index 157b409be14d3..8368f5916ef91 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java @@ -10,11 +10,15 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; +import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; @@ -23,17 +27,47 @@ public class NodeAllocationStatsProvider { private final WriteLoadForecaster writeLoadForecaster; - public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster) { + private volatile float indexBalanceFactor; + private volatile float shardBalanceFactor; + private volatile float writeLoadBalanceFactor; + private volatile float diskUsageBalanceFactor; + + public record NodeAllocationAndClusterBalanceStats( + int shards, + int undesiredShards, + double forecastedIngestLoad, + long forecastedDiskUsage, + long currentDiskUsage, + float currentNodeWeight + ) {} + + public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) { this.writeLoadForecaster = writeLoadForecaster; + clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value); + clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value); + clusterSettings.initializeAndWatch( + BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING, + value -> this.writeLoadBalanceFactor = value + ); + clusterSettings.initializeAndWatch( + BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, + value -> this.diskUsageBalanceFactor = value + ); } - public Map stats( - ClusterState clusterState, + public Map stats( + Metadata metadata, + RoutingNodes routingNodes, ClusterInfo clusterInfo, @Nullable DesiredBalance desiredBalance ) { - var stats = Maps.newMapWithExpectedSize(clusterState.getRoutingNodes().size()); - for (RoutingNode node : clusterState.getRoutingNodes()) { + var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor); + var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); + var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); + var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes); + + var stats = Maps.newMapWithExpectedSize(routingNodes.size()); + for (RoutingNode node : routingNodes) { int shards = 0; int undesiredShards = 0; double forecastedWriteLoad = 0.0; @@ -44,7 +78,7 @@ public Map stats( continue; } shards++; - IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(shardRouting.index()); + IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index()); if (isDesiredAllocation(desiredBalance, shardRouting) == false) { undesiredShards++; } @@ -54,14 +88,23 @@ public Map stats( currentDiskUsage += shardSize; } + float currentNodeWeight = weightFunction.nodeWeight( + shards, + avgShardsPerNode, + forecastedWriteLoad, + avgWriteLoadPerNode, + currentDiskUsage, + avgDiskUsageInBytesPerNode + ); stats.put( node.nodeId(), - new NodeAllocationStats( + new NodeAllocationAndClusterBalanceStats( shards, desiredBalance != null ? undesiredShards : -1, forecastedWriteLoad, forecastedDiskUsage, - currentDiskUsage + currentDiskUsage, + currentNodeWeight ) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 5b8fb0c7e9203..8dd1f14564ce9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -168,14 +168,17 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w Map nodeLevelWeights = new HashMap<>(); for (var entry : balancer.nodes.entrySet()) { var node = entry.getValue(); + var nodeWeight = weightFunction.nodeWeight( + node.numShards(), + balancer.avgShardsPerNode(), + node.writeLoad(), + balancer.avgWriteLoadPerNode(), + node.diskUsageInBytes(), + balancer.avgDiskUsageInBytesPerNode() + ); nodeLevelWeights.put( node.routingNode.node(), - new DesiredBalanceMetrics.NodeWeightStats( - node.numShards(), - node.diskUsageInBytes(), - node.writeLoad(), - weightFunction.nodeWeight(balancer, node) - ) + new DesiredBalanceMetrics.NodeWeightStats(node.numShards(), node.diskUsageInBytes(), node.writeLoad(), nodeWeight) ); } allocation.routingNodes().setBalanceWeightStatsPerNode(nodeLevelWeights); @@ -252,65 +255,6 @@ public float getShardBalance() { return shardBalanceFactor; } - /** - * This class is the primary weight function used to create balanced over nodes and shards in the cluster. - * Currently this function has 3 properties: - *
    - *
  • index balance - balance property over shards per index
  • - *
  • shard balance - balance property over shards per cluster
  • - *
- *

- * Each of these properties are expressed as factor such that the properties factor defines the relative - * importance of the property for the weight function. For example if the weight function should calculate - * the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will - * in turn have no effect on the distribution. - *

- * The weight per index is calculated based on the following formula: - *
    - *
  • - * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) - *
  • - *
  • - * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) - *
  • - *
- * weight(node, index) = weightindex(node, index) + weightnode(node, index) - */ - private static class WeightFunction { - - private final float theta0; - private final float theta1; - private final float theta2; - private final float theta3; - - WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) { - float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance; - if (sum <= 0.0f) { - throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); - } - theta0 = shardBalance / sum; - theta1 = indexBalance / sum; - theta2 = writeLoadBalance / sum; - theta3 = diskUsageBalance / sum; - } - - float weight(Balancer balancer, ModelNode node, String index) { - final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); - return nodeWeight(balancer, node) + theta1 * weightIndex; - } - - float nodeWeight(Balancer balancer, ModelNode node) { - final float weightShard = node.numShards() - balancer.avgShardsPerNode(); - final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode()); - final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode()); - return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage; - } - - float minWeightDelta(Balancer balancer, String index) { - return theta0 * 1 + theta1 * 1 + theta2 * balancer.getShardWriteLoad(index) + theta3 * balancer.maxShardSizeBytes(index); - } - } - /** * A {@link Balancer} */ @@ -335,63 +279,13 @@ private Balancer(WriteLoadForecaster writeLoadForecaster, RoutingAllocation allo this.metadata = allocation.metadata(); this.weight = weight; this.threshold = threshold; - avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); - avgWriteLoadPerNode = getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size(); - avgDiskUsageInBytesPerNode = ((double) getTotalDiskUsageInBytes(allocation.clusterInfo(), metadata) / routingNodes.size()); + avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); + avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); + avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); } - private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) { - double writeLoad = 0.0; - for (IndexMetadata indexMetadata : metadata.indices().values()) { - writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata); - } - return writeLoad; - } - - private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) { - var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0); - return shardWriteLoad * numberOfCopies(indexMetadata); - } - - private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) { - long totalDiskUsageInBytes = 0; - for (IndexMetadata indexMetadata : metadata.indices().values()) { - totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata); - } - return totalDiskUsageInBytes; - } - - // Visible for testing - static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) { - if (indexMetadata.ignoreDiskWatermarks()) { - // disk watermarks are ignored for partial searchable snapshots - // and is equivalent to indexMetadata.isPartialSearchableSnapshot() - return 0; - } - final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L); - long totalSizeInBytes = 0; - int shardCount = 0; - for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { - final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard); - final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L)); - if (primaryShardSize != -1L) { - totalSizeInBytes += primaryShardSize; - shardCount++; - } - final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L)); - if (replicaShardSize != -1L) { - totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas(); - shardCount += indexMetadata.getNumberOfReplicas(); - } - } - if (shardCount == numberOfCopies(indexMetadata)) { - return totalSizeInBytes; - } - return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata); - } - private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { if (indexMetadata.ignoreDiskWatermarks()) { // disk watermarks are ignored for partial searchable snapshots @@ -401,10 +295,6 @@ private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMet return Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0L), clusterInfo.getShardSize(shardRouting, 0L)); } - private static int numberOfCopies(IndexMetadata indexMetadata) { - return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas()); - } - private float getShardWriteLoad(String index) { return (float) writeLoadForecaster.getForecastedWriteLoad(metadata.index(index)).orElse(0.0); } @@ -1433,7 +1323,7 @@ public float weight(ModelNode node) { } public float minWeightDelta() { - return function.minWeightDelta(balancer, index); + return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index)); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java index 9de95804b49b2..6ad44fdf3a9c0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java @@ -21,7 +21,7 @@ * * @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated * @param weightsPerNode The node weights calculated based on - * {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WeightFunction#nodeWeight} + * {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#nodeWeight} */ public record DesiredBalance( long lastConvergedIndex, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index cf8840dc95724..9f6487bdc8abd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -10,7 +10,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; +import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider.NodeAllocationAndClusterBalanceStats; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.telemetry.metric.MeterRegistry; @@ -41,6 +41,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String DESIRED_BALANCE_NODE_DISK_USAGE_METRIC_NAME = "es.allocator.desired_balance.allocations.node_disk_usage_bytes.current"; + public static final String CURRENT_NODE_WEIGHT_METRIC_NAME = "es.allocator.allocations.node.weight.current"; public static final String CURRENT_NODE_SHARD_COUNT_METRIC_NAME = "es.allocator.allocations.node.shard_count.current"; public static final String CURRENT_NODE_WRITE_LOAD_METRIC_NAME = "es.allocator.allocations.node.write_load.current"; public static final String CURRENT_NODE_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.disk_usage_bytes.current"; @@ -68,12 +69,13 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w private volatile long undesiredAllocations; private final AtomicReference> weightStatsPerNodeRef = new AtomicReference<>(Map.of()); - private final AtomicReference> allocationStatsPerNodeRef = new AtomicReference<>(Map.of()); + private final AtomicReference> allocationStatsPerNodeRef = + new AtomicReference<>(Map.of()); public void updateMetrics( AllocationStats allocationStats, Map weightStatsPerNode, - Map nodeAllocationStats + Map nodeAllocationStats ) { assert allocationStats != null : "allocation stats cannot be null"; assert weightStatsPerNode != null : "node balance weight stats cannot be null"; @@ -124,6 +126,12 @@ public DesiredBalanceMetrics(MeterRegistry meterRegistry) { "bytes", this::getDesiredBalanceNodeDiskUsageMetrics ); + meterRegistry.registerDoublesGauge( + CURRENT_NODE_WEIGHT_METRIC_NAME, + "The weight of nodes based on the current allocation state", + "unit", + this::getCurrentNodeWeightMetrics + ); meterRegistry.registerLongsGauge( DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME, "Shard count of nodes in the computed desired balance", @@ -291,6 +299,18 @@ private List getCurrentNodeUndesiredShardCountMetrics() { return values; } + private List getCurrentNodeWeightMetrics() { + if (nodeIsMaster == false) { + return List.of(); + } + var stats = allocationStatsPerNodeRef.get(); + List doubles = new ArrayList<>(stats.size()); + for (var node : stats.keySet()) { + doubles.add(new DoubleWithAttributes(stats.get(node).currentNodeWeight(), getNodeAttributes(node))); + } + return doubles; + } + private Map getNodeAttributes(DiscoveryNode node) { return Map.of("node_id", node.getId(), "node_name", node.getName()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 5ad29debc8f20..2ee905634f760 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -20,8 +20,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider; +import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider.NodeAllocationAndClusterBalanceStats; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -159,8 +159,13 @@ void run() { } private void updateDesireBalanceMetrics(AllocationStats allocationStats) { - var stats = nodeAllocationStatsProvider.stats(allocation.getClusterState(), allocation.clusterInfo(), desiredBalance); - Map nodeAllocationStats = new HashMap<>(stats.size()); + var stats = nodeAllocationStatsProvider.stats( + allocation.metadata(), + allocation.routingNodes(), + allocation.clusterInfo(), + desiredBalance + ); + Map nodeAllocationStats = new HashMap<>(stats.size()); for (var entry : stats.entrySet()) { var node = allocation.nodes().get(entry.getKey()); if (node != null) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java new file mode 100644 index 0000000000000..7203a92b147f6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.index.shard.ShardId; + +/** + * This class is the primary weight function used to create balanced over nodes and shards in the cluster. + * Currently this function has 3 properties: + *
    + *
  • index balance - balance property over shards per index
  • + *
  • shard balance - balance property over shards per cluster
  • + *
+ *

+ * Each of these properties are expressed as factor such that the properties factor defines the relative + * importance of the property for the weight function. For example if the weight function should calculate + * the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will + * in turn have no effect on the distribution. + *

+ * The weight per index is calculated based on the following formula: + *
    + *
  • + * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) + *
  • + *
  • + * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) + *
  • + *
+ * weight(node, index) = weightindex(node, index) + weightnode(node, index) + */ +public class WeightFunction { + + private final float theta0; + private final float theta1; + private final float theta2; + private final float theta3; + + public WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) { + float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance; + if (sum <= 0.0f) { + throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); + } + theta0 = shardBalance / sum; + theta1 = indexBalance / sum; + theta2 = writeLoadBalance / sum; + theta3 = diskUsageBalance / sum; + } + + float weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); + final float nodeWeight = nodeWeight( + node.numShards(), + balancer.avgShardsPerNode(), + node.writeLoad(), + balancer.avgWriteLoadPerNode(), + node.diskUsageInBytes(), + balancer.avgDiskUsageInBytesPerNode() + ); + return nodeWeight + theta1 * weightIndex; + } + + public float nodeWeight( + int nodeNumShards, + float avgShardsPerNode, + double nodeWriteLoad, + double avgWriteLoadPerNode, + double diskUsageInBytes, + double avgDiskUsageInBytesPerNode + ) { + final float weightShard = nodeNumShards - avgShardsPerNode; + final float ingestLoad = (float) (nodeWriteLoad - avgWriteLoadPerNode); + final float diskUsage = (float) (diskUsageInBytes - avgDiskUsageInBytesPerNode); + return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage; + } + + float minWeightDelta(float shardWriteLoad, float shardSizeBytes) { + return theta0 * 1 + theta1 * 1 + theta2 * shardWriteLoad + theta3 * shardSizeBytes; + } + + public static float avgShardPerNode(Metadata metadata, RoutingNodes routingNodes) { + return ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); + } + + public static double avgWriteLoadPerNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, RoutingNodes routingNodes) { + return getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size(); + } + + public static double avgDiskUsageInBytesPerNode(ClusterInfo clusterInfo, Metadata metadata, RoutingNodes routingNodes) { + return ((double) getTotalDiskUsageInBytes(clusterInfo, metadata) / routingNodes.size()); + } + + private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) { + double writeLoad = 0.0; + for (IndexMetadata indexMetadata : metadata.indices().values()) { + writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata); + } + return writeLoad; + } + + private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) { + var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0); + return shardWriteLoad * numberOfCopies(indexMetadata); + } + + private static int numberOfCopies(IndexMetadata indexMetadata) { + return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas()); + } + + private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) { + long totalDiskUsageInBytes = 0; + for (IndexMetadata indexMetadata : metadata.indices().values()) { + totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata); + } + return totalDiskUsageInBytes; + } + + // Visible for testing + static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) { + if (indexMetadata.ignoreDiskWatermarks()) { + // disk watermarks are ignored for partial searchable snapshots + // and is equivalent to indexMetadata.isPartialSearchableSnapshot() + return 0; + } + final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L); + long totalSizeInBytes = 0; + int shardCount = 0; + for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { + final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard); + final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L)); + if (primaryShardSize != -1L) { + totalSizeInBytes += primaryShardSize; + shardCount++; + } + final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L)); + if (replicaShardSize != -1L) { + totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas(); + shardCount += indexMetadata.getNumberOfReplicas(); + } + } + if (shardCount == numberOfCopies(indexMetadata)) { + return totalSizeInBytes; + } + return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 0efa576a0cddc..35f1780464659 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -84,7 +84,7 @@ public void testShardStats() { clusterService, () -> clusterInfo, createShardAllocator(), - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), @@ -125,7 +125,7 @@ public void testRelocatingShardIsOnlyCountedOnceOnTargetNode() { clusterService, EmptyClusterInfoService.INSTANCE, createShardAllocator(), - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), @@ -182,7 +182,7 @@ public DesiredBalance getDesiredBalance() { ); } }, - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 98c3451329f52..412329e51a485 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -59,8 +59,8 @@ import static java.util.stream.Collectors.toSet; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; -import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.getIndexDiskUsageInBytes; import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction.getIndexDiskUsageInBytes; import static org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index a041efc9ad3f1..75cd6da44724d 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -19,12 +19,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodesHelper; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; @@ -438,11 +438,13 @@ public void allocateUnassigned( } protected static final NodeAllocationStatsProvider EMPTY_NODE_ALLOCATION_STATS = new NodeAllocationStatsProvider( - WriteLoadForecaster.DEFAULT + WriteLoadForecaster.DEFAULT, + createBuiltInClusterSettings() ) { @Override - public Map stats( - ClusterState clusterState, + public Map stats( + Metadata metadata, + RoutingNodes routingNodes, ClusterInfo clusterInfo, @Nullable DesiredBalance desiredBalance ) {