diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java index b3ec4a5331180..355427c4e059b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java @@ -117,6 +117,15 @@ public void testDesiredBalanceMetrics() { assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds))); assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames))); } + final var currentNodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement( + DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME + ); + assertThat(currentNodeWeightsMetrics.size(), equalTo(2)); + for (var nodeStat : currentNodeWeightsMetrics) { + assertTrue(nodeStat.isDouble()); + assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds))); + assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames))); + } final var currentNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement( DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME ); @@ -196,6 +205,7 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME), matcher ); + assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 046f4b6b0b251..c2da33f8f4135 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -139,7 +139,7 @@ public ClusterModule( this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); this.allocationDeciders = new AllocationDeciders(deciderList); - var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster); + var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster, clusterService.getClusterSettings()); this.shardsAllocator = createShardsAllocator( settings, clusterService.getClusterSettings(), diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java index 0c82faaaeaa45..b98e9050d2b4a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.function.Supplier; +import java.util.stream.Collectors; public class AllocationStatsService { private final ClusterService clusterService; @@ -39,6 +40,26 @@ public AllocationStatsService( } public Map stats() { - return nodeAllocationStatsProvider.stats(clusterService.state(), clusterInfoService.getClusterInfo(), desiredBalanceSupplier.get()); + var state = clusterService.state(); + var stats = nodeAllocationStatsProvider.stats( + state.metadata(), + state.getRoutingNodes(), + clusterInfoService.getClusterInfo(), + desiredBalanceSupplier.get() + ); + return stats.entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> new NodeAllocationStats( + e.getValue().shards(), + e.getValue().undesiredShards(), + e.getValue().forecastedIngestLoad(), + e.getValue().forecastedDiskUsage(), + e.getValue().currentDiskUsage() + ) + ) + ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java index 157b409be14d3..8368f5916ef91 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java @@ -10,11 +10,15 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; +import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; @@ -23,17 +27,47 @@ public class NodeAllocationStatsProvider { private final WriteLoadForecaster writeLoadForecaster; - public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster) { + private volatile float indexBalanceFactor; + private volatile float shardBalanceFactor; + private volatile float writeLoadBalanceFactor; + private volatile float diskUsageBalanceFactor; + + public record NodeAllocationAndClusterBalanceStats( + int shards, + int undesiredShards, + double forecastedIngestLoad, + long forecastedDiskUsage, + long currentDiskUsage, + float currentNodeWeight + ) {} + + public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) { this.writeLoadForecaster = writeLoadForecaster; + clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value); + clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value); + clusterSettings.initializeAndWatch( + BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING, + value -> this.writeLoadBalanceFactor = value + ); + clusterSettings.initializeAndWatch( + BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, + value -> this.diskUsageBalanceFactor = value + ); } - public Map stats( - ClusterState clusterState, + public Map stats( + Metadata metadata, + RoutingNodes routingNodes, ClusterInfo clusterInfo, @Nullable DesiredBalance desiredBalance ) { - var stats = Maps.newMapWithExpectedSize(clusterState.getRoutingNodes().size()); - for (RoutingNode node : clusterState.getRoutingNodes()) { + var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor); + var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); + var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); + var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes); + + var stats = Maps.newMapWithExpectedSize(routingNodes.size()); + for (RoutingNode node : routingNodes) { int shards = 0; int undesiredShards = 0; double forecastedWriteLoad = 0.0; @@ -44,7 +78,7 @@ public Map stats( continue; } shards++; - IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(shardRouting.index()); + IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index()); if (isDesiredAllocation(desiredBalance, shardRouting) == false) { undesiredShards++; } @@ -54,14 +88,23 @@ public Map stats( currentDiskUsage += shardSize; } + float currentNodeWeight = weightFunction.nodeWeight( + shards, + avgShardsPerNode, + forecastedWriteLoad, + avgWriteLoadPerNode, + currentDiskUsage, + avgDiskUsageInBytesPerNode + ); stats.put( node.nodeId(), - new NodeAllocationStats( + new NodeAllocationAndClusterBalanceStats( shards, desiredBalance != null ? undesiredShards : -1, forecastedWriteLoad, forecastedDiskUsage, - currentDiskUsage + currentDiskUsage, + currentNodeWeight ) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 5b8fb0c7e9203..8dd1f14564ce9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -168,14 +168,17 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w Map nodeLevelWeights = new HashMap<>(); for (var entry : balancer.nodes.entrySet()) { var node = entry.getValue(); + var nodeWeight = weightFunction.nodeWeight( + node.numShards(), + balancer.avgShardsPerNode(), + node.writeLoad(), + balancer.avgWriteLoadPerNode(), + node.diskUsageInBytes(), + balancer.avgDiskUsageInBytesPerNode() + ); nodeLevelWeights.put( node.routingNode.node(), - new DesiredBalanceMetrics.NodeWeightStats( - node.numShards(), - node.diskUsageInBytes(), - node.writeLoad(), - weightFunction.nodeWeight(balancer, node) - ) + new DesiredBalanceMetrics.NodeWeightStats(node.numShards(), node.diskUsageInBytes(), node.writeLoad(), nodeWeight) ); } allocation.routingNodes().setBalanceWeightStatsPerNode(nodeLevelWeights); @@ -252,65 +255,6 @@ public float getShardBalance() { return shardBalanceFactor; } - /** - * This class is the primary weight function used to create balanced over nodes and shards in the cluster. - * Currently this function has 3 properties: - *
    - *
  • index balance - balance property over shards per index
  • - *
  • shard balance - balance property over shards per cluster
  • - *
- *

- * Each of these properties are expressed as factor such that the properties factor defines the relative - * importance of the property for the weight function. For example if the weight function should calculate - * the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will - * in turn have no effect on the distribution. - *

- * The weight per index is calculated based on the following formula: - *
    - *
  • - * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) - *
  • - *
  • - * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) - *
  • - *
- * weight(node, index) = weightindex(node, index) + weightnode(node, index) - */ - private static class WeightFunction { - - private final float theta0; - private final float theta1; - private final float theta2; - private final float theta3; - - WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) { - float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance; - if (sum <= 0.0f) { - throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); - } - theta0 = shardBalance / sum; - theta1 = indexBalance / sum; - theta2 = writeLoadBalance / sum; - theta3 = diskUsageBalance / sum; - } - - float weight(Balancer balancer, ModelNode node, String index) { - final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); - return nodeWeight(balancer, node) + theta1 * weightIndex; - } - - float nodeWeight(Balancer balancer, ModelNode node) { - final float weightShard = node.numShards() - balancer.avgShardsPerNode(); - final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode()); - final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode()); - return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage; - } - - float minWeightDelta(Balancer balancer, String index) { - return theta0 * 1 + theta1 * 1 + theta2 * balancer.getShardWriteLoad(index) + theta3 * balancer.maxShardSizeBytes(index); - } - } - /** * A {@link Balancer} */ @@ -335,63 +279,13 @@ private Balancer(WriteLoadForecaster writeLoadForecaster, RoutingAllocation allo this.metadata = allocation.metadata(); this.weight = weight; this.threshold = threshold; - avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); - avgWriteLoadPerNode = getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size(); - avgDiskUsageInBytesPerNode = ((double) getTotalDiskUsageInBytes(allocation.clusterInfo(), metadata) / routingNodes.size()); + avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); + avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); + avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); } - private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) { - double writeLoad = 0.0; - for (IndexMetadata indexMetadata : metadata.indices().values()) { - writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata); - } - return writeLoad; - } - - private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) { - var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0); - return shardWriteLoad * numberOfCopies(indexMetadata); - } - - private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) { - long totalDiskUsageInBytes = 0; - for (IndexMetadata indexMetadata : metadata.indices().values()) { - totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata); - } - return totalDiskUsageInBytes; - } - - // Visible for testing - static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) { - if (indexMetadata.ignoreDiskWatermarks()) { - // disk watermarks are ignored for partial searchable snapshots - // and is equivalent to indexMetadata.isPartialSearchableSnapshot() - return 0; - } - final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L); - long totalSizeInBytes = 0; - int shardCount = 0; - for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { - final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard); - final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L)); - if (primaryShardSize != -1L) { - totalSizeInBytes += primaryShardSize; - shardCount++; - } - final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L)); - if (replicaShardSize != -1L) { - totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas(); - shardCount += indexMetadata.getNumberOfReplicas(); - } - } - if (shardCount == numberOfCopies(indexMetadata)) { - return totalSizeInBytes; - } - return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata); - } - private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { if (indexMetadata.ignoreDiskWatermarks()) { // disk watermarks are ignored for partial searchable snapshots @@ -401,10 +295,6 @@ private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMet return Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0L), clusterInfo.getShardSize(shardRouting, 0L)); } - private static int numberOfCopies(IndexMetadata indexMetadata) { - return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas()); - } - private float getShardWriteLoad(String index) { return (float) writeLoadForecaster.getForecastedWriteLoad(metadata.index(index)).orElse(0.0); } @@ -1433,7 +1323,7 @@ public float weight(ModelNode node) { } public float minWeightDelta() { - return function.minWeightDelta(balancer, index); + return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index)); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java index 9de95804b49b2..6ad44fdf3a9c0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java @@ -21,7 +21,7 @@ * * @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated * @param weightsPerNode The node weights calculated based on - * {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WeightFunction#nodeWeight} + * {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#nodeWeight} */ public record DesiredBalance( long lastConvergedIndex, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index cf8840dc95724..9f6487bdc8abd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -10,7 +10,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; +import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider.NodeAllocationAndClusterBalanceStats; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.telemetry.metric.MeterRegistry; @@ -41,6 +41,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String DESIRED_BALANCE_NODE_DISK_USAGE_METRIC_NAME = "es.allocator.desired_balance.allocations.node_disk_usage_bytes.current"; + public static final String CURRENT_NODE_WEIGHT_METRIC_NAME = "es.allocator.allocations.node.weight.current"; public static final String CURRENT_NODE_SHARD_COUNT_METRIC_NAME = "es.allocator.allocations.node.shard_count.current"; public static final String CURRENT_NODE_WRITE_LOAD_METRIC_NAME = "es.allocator.allocations.node.write_load.current"; public static final String CURRENT_NODE_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.disk_usage_bytes.current"; @@ -68,12 +69,13 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w private volatile long undesiredAllocations; private final AtomicReference> weightStatsPerNodeRef = new AtomicReference<>(Map.of()); - private final AtomicReference> allocationStatsPerNodeRef = new AtomicReference<>(Map.of()); + private final AtomicReference> allocationStatsPerNodeRef = + new AtomicReference<>(Map.of()); public void updateMetrics( AllocationStats allocationStats, Map weightStatsPerNode, - Map nodeAllocationStats + Map nodeAllocationStats ) { assert allocationStats != null : "allocation stats cannot be null"; assert weightStatsPerNode != null : "node balance weight stats cannot be null"; @@ -124,6 +126,12 @@ public DesiredBalanceMetrics(MeterRegistry meterRegistry) { "bytes", this::getDesiredBalanceNodeDiskUsageMetrics ); + meterRegistry.registerDoublesGauge( + CURRENT_NODE_WEIGHT_METRIC_NAME, + "The weight of nodes based on the current allocation state", + "unit", + this::getCurrentNodeWeightMetrics + ); meterRegistry.registerLongsGauge( DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME, "Shard count of nodes in the computed desired balance", @@ -291,6 +299,18 @@ private List getCurrentNodeUndesiredShardCountMetrics() { return values; } + private List getCurrentNodeWeightMetrics() { + if (nodeIsMaster == false) { + return List.of(); + } + var stats = allocationStatsPerNodeRef.get(); + List doubles = new ArrayList<>(stats.size()); + for (var node : stats.keySet()) { + doubles.add(new DoubleWithAttributes(stats.get(node).currentNodeWeight(), getNodeAttributes(node))); + } + return doubles; + } + private Map getNodeAttributes(DiscoveryNode node) { return Map.of("node_id", node.getId(), "node_name", node.getName()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 5ad29debc8f20..2ee905634f760 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -20,8 +20,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider; +import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider.NodeAllocationAndClusterBalanceStats; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -159,8 +159,13 @@ void run() { } private void updateDesireBalanceMetrics(AllocationStats allocationStats) { - var stats = nodeAllocationStatsProvider.stats(allocation.getClusterState(), allocation.clusterInfo(), desiredBalance); - Map nodeAllocationStats = new HashMap<>(stats.size()); + var stats = nodeAllocationStatsProvider.stats( + allocation.metadata(), + allocation.routingNodes(), + allocation.clusterInfo(), + desiredBalance + ); + Map nodeAllocationStats = new HashMap<>(stats.size()); for (var entry : stats.entrySet()) { var node = allocation.nodes().get(entry.getKey()); if (node != null) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java new file mode 100644 index 0000000000000..7203a92b147f6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.index.shard.ShardId; + +/** + * This class is the primary weight function used to create balanced over nodes and shards in the cluster. + * Currently this function has 3 properties: + *
    + *
  • index balance - balance property over shards per index
  • + *
  • shard balance - balance property over shards per cluster
  • + *
+ *

+ * Each of these properties are expressed as factor such that the properties factor defines the relative + * importance of the property for the weight function. For example if the weight function should calculate + * the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will + * in turn have no effect on the distribution. + *

+ * The weight per index is calculated based on the following formula: + *
    + *
  • + * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) + *
  • + *
  • + * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) + *
  • + *
+ * weight(node, index) = weightindex(node, index) + weightnode(node, index) + */ +public class WeightFunction { + + private final float theta0; + private final float theta1; + private final float theta2; + private final float theta3; + + public WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) { + float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance; + if (sum <= 0.0f) { + throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); + } + theta0 = shardBalance / sum; + theta1 = indexBalance / sum; + theta2 = writeLoadBalance / sum; + theta3 = diskUsageBalance / sum; + } + + float weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); + final float nodeWeight = nodeWeight( + node.numShards(), + balancer.avgShardsPerNode(), + node.writeLoad(), + balancer.avgWriteLoadPerNode(), + node.diskUsageInBytes(), + balancer.avgDiskUsageInBytesPerNode() + ); + return nodeWeight + theta1 * weightIndex; + } + + public float nodeWeight( + int nodeNumShards, + float avgShardsPerNode, + double nodeWriteLoad, + double avgWriteLoadPerNode, + double diskUsageInBytes, + double avgDiskUsageInBytesPerNode + ) { + final float weightShard = nodeNumShards - avgShardsPerNode; + final float ingestLoad = (float) (nodeWriteLoad - avgWriteLoadPerNode); + final float diskUsage = (float) (diskUsageInBytes - avgDiskUsageInBytesPerNode); + return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage; + } + + float minWeightDelta(float shardWriteLoad, float shardSizeBytes) { + return theta0 * 1 + theta1 * 1 + theta2 * shardWriteLoad + theta3 * shardSizeBytes; + } + + public static float avgShardPerNode(Metadata metadata, RoutingNodes routingNodes) { + return ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); + } + + public static double avgWriteLoadPerNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, RoutingNodes routingNodes) { + return getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size(); + } + + public static double avgDiskUsageInBytesPerNode(ClusterInfo clusterInfo, Metadata metadata, RoutingNodes routingNodes) { + return ((double) getTotalDiskUsageInBytes(clusterInfo, metadata) / routingNodes.size()); + } + + private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) { + double writeLoad = 0.0; + for (IndexMetadata indexMetadata : metadata.indices().values()) { + writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata); + } + return writeLoad; + } + + private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) { + var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0); + return shardWriteLoad * numberOfCopies(indexMetadata); + } + + private static int numberOfCopies(IndexMetadata indexMetadata) { + return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas()); + } + + private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) { + long totalDiskUsageInBytes = 0; + for (IndexMetadata indexMetadata : metadata.indices().values()) { + totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata); + } + return totalDiskUsageInBytes; + } + + // Visible for testing + static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) { + if (indexMetadata.ignoreDiskWatermarks()) { + // disk watermarks are ignored for partial searchable snapshots + // and is equivalent to indexMetadata.isPartialSearchableSnapshot() + return 0; + } + final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L); + long totalSizeInBytes = 0; + int shardCount = 0; + for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { + final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard); + final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L)); + if (primaryShardSize != -1L) { + totalSizeInBytes += primaryShardSize; + shardCount++; + } + final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L)); + if (replicaShardSize != -1L) { + totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas(); + shardCount += indexMetadata.getNumberOfReplicas(); + } + } + if (shardCount == numberOfCopies(indexMetadata)) { + return totalSizeInBytes; + } + return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 0efa576a0cddc..35f1780464659 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -84,7 +84,7 @@ public void testShardStats() { clusterService, () -> clusterInfo, createShardAllocator(), - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), @@ -125,7 +125,7 @@ public void testRelocatingShardIsOnlyCountedOnceOnTargetNode() { clusterService, EmptyClusterInfoService.INSTANCE, createShardAllocator(), - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), @@ -182,7 +182,7 @@ public DesiredBalance getDesiredBalance() { ); } }, - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 98c3451329f52..412329e51a485 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -59,8 +59,8 @@ import static java.util.stream.Collectors.toSet; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; -import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.getIndexDiskUsageInBytes; import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction.getIndexDiskUsageInBytes; import static org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index a041efc9ad3f1..75cd6da44724d 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -19,12 +19,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodesHelper; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; @@ -438,11 +438,13 @@ public void allocateUnassigned( } protected static final NodeAllocationStatsProvider EMPTY_NODE_ALLOCATION_STATS = new NodeAllocationStatsProvider( - WriteLoadForecaster.DEFAULT + WriteLoadForecaster.DEFAULT, + createBuiltInClusterSettings() ) { @Override - public Map stats( - ClusterState clusterState, + public Map stats( + Metadata metadata, + RoutingNodes routingNodes, ClusterInfo clusterInfo, @Nullable DesiredBalance desiredBalance ) {