Skip to content

Commit

Permalink
Merge branch 'main' into InternalSignificantTermsLean
Browse files Browse the repository at this point in the history
  • Loading branch information
iverase authored Nov 26, 2024
2 parents 6693621 + b22d185 commit c031d01
Show file tree
Hide file tree
Showing 21 changed files with 435 additions and 158 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117551.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117551
summary: Fix stats by constant expresson with alias
area: ES|QL
type: bug
issues: []
1 change: 1 addition & 0 deletions docs/reference/data-streams/tsds.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,5 @@ include::tsds-index-settings.asciidoc[]
include::downsampling.asciidoc[]
include::downsampling-ilm.asciidoc[]
include::downsampling-manual.asciidoc[]
include::downsampling-dsl.asciidoc[]
include::tsds-reindex.asciidoc[]
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ tests:
- class: org.elasticsearch.xpack.inference.InferenceRestIT
method: test {p0=inference/30_semantic_text_inference/Calculates embeddings using the default ELSER 2 endpoint}
issue: https://github.com/elastic/elasticsearch/issues/117349
- class: org.elasticsearch.xpack.security.operator.OperatorPrivilegesIT
method: testEveryActionIsEitherOperatorOnlyOrNonOperator
issue: https://github.com/elastic/elasticsearch/issues/102992
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_reset/Test reset running transform}
issue: https://github.com/elastic/elasticsearch/issues/117473
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ public void testDesiredBalanceMetrics() {
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
final var currentNodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME
);
assertThat(currentNodeWeightsMetrics.size(), equalTo(2));
for (var nodeStat : currentNodeWeightsMetrics) {
assertTrue(nodeStat.isDouble());
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
final var currentNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME
);
Expand Down Expand Up @@ -196,6 +205,7 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou
testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME),
matcher
);
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public ClusterModule(
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster);
var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster, clusterService.getClusterSettings());
this.shardsAllocator = createShardsAllocator(
settings,
clusterService.getClusterSettings(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class AllocationStatsService {
private final ClusterService clusterService;
Expand All @@ -39,6 +40,26 @@ public AllocationStatsService(
}

public Map<String, NodeAllocationStats> stats() {
return nodeAllocationStatsProvider.stats(clusterService.state(), clusterInfoService.getClusterInfo(), desiredBalanceSupplier.get());
var state = clusterService.state();
var stats = nodeAllocationStatsProvider.stats(
state.metadata(),
state.getRoutingNodes(),
clusterInfoService.getClusterInfo(),
desiredBalanceSupplier.get()
);
return stats.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> new NodeAllocationStats(
e.getValue().shards(),
e.getValue().undesiredShards(),
e.getValue().forecastedIngestLoad(),
e.getValue().forecastedDiskUsage(),
e.getValue().currentDiskUsage()
)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@
package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;

Expand All @@ -23,17 +27,47 @@
public class NodeAllocationStatsProvider {
private final WriteLoadForecaster writeLoadForecaster;

public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster) {
private volatile float indexBalanceFactor;
private volatile float shardBalanceFactor;
private volatile float writeLoadBalanceFactor;
private volatile float diskUsageBalanceFactor;

public record NodeAllocationAndClusterBalanceStats(
int shards,
int undesiredShards,
double forecastedIngestLoad,
long forecastedDiskUsage,
long currentDiskUsage,
float currentNodeWeight
) {}

public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) {
this.writeLoadForecaster = writeLoadForecaster;
clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
clusterSettings.initializeAndWatch(
BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING,
value -> this.writeLoadBalanceFactor = value
);
clusterSettings.initializeAndWatch(
BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING,
value -> this.diskUsageBalanceFactor = value
);
}

public Map<String, NodeAllocationStats> stats(
ClusterState clusterState,
public Map<String, NodeAllocationAndClusterBalanceStats> stats(
Metadata metadata,
RoutingNodes routingNodes,
ClusterInfo clusterInfo,
@Nullable DesiredBalance desiredBalance
) {
var stats = Maps.<String, NodeAllocationStats>newMapWithExpectedSize(clusterState.getRoutingNodes().size());
for (RoutingNode node : clusterState.getRoutingNodes()) {
var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor);
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);

var stats = Maps.<String, NodeAllocationAndClusterBalanceStats>newMapWithExpectedSize(routingNodes.size());
for (RoutingNode node : routingNodes) {
int shards = 0;
int undesiredShards = 0;
double forecastedWriteLoad = 0.0;
Expand All @@ -44,7 +78,7 @@ public Map<String, NodeAllocationStats> stats(
continue;
}
shards++;
IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(shardRouting.index());
IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index());
if (isDesiredAllocation(desiredBalance, shardRouting) == false) {
undesiredShards++;
}
Expand All @@ -54,14 +88,23 @@ public Map<String, NodeAllocationStats> stats(
currentDiskUsage += shardSize;

}
float currentNodeWeight = weightFunction.nodeWeight(
shards,
avgShardsPerNode,
forecastedWriteLoad,
avgWriteLoadPerNode,
currentDiskUsage,
avgDiskUsageInBytesPerNode
);
stats.put(
node.nodeId(),
new NodeAllocationStats(
new NodeAllocationAndClusterBalanceStats(
shards,
desiredBalance != null ? undesiredShards : -1,
forecastedWriteLoad,
forecastedDiskUsage,
currentDiskUsage
currentDiskUsage,
currentNodeWeight
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,17 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> nodeLevelWeights = new HashMap<>();
for (var entry : balancer.nodes.entrySet()) {
var node = entry.getValue();
var nodeWeight = weightFunction.nodeWeight(
node.numShards(),
balancer.avgShardsPerNode(),
node.writeLoad(),
balancer.avgWriteLoadPerNode(),
node.diskUsageInBytes(),
balancer.avgDiskUsageInBytesPerNode()
);
nodeLevelWeights.put(
node.routingNode.node(),
new DesiredBalanceMetrics.NodeWeightStats(
node.numShards(),
node.diskUsageInBytes(),
node.writeLoad(),
weightFunction.nodeWeight(balancer, node)
)
new DesiredBalanceMetrics.NodeWeightStats(node.numShards(), node.diskUsageInBytes(), node.writeLoad(), nodeWeight)
);
}
allocation.routingNodes().setBalanceWeightStatsPerNode(nodeLevelWeights);
Expand Down Expand Up @@ -252,65 +255,6 @@ public float getShardBalance() {
return shardBalanceFactor;
}

/**
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
* Currently this function has 3 properties:
* <ul>
* <li><code>index balance</code> - balance property over shards per index</li>
* <li><code>shard balance</code> - balance property over shards per cluster</li>
* </ul>
* <p>
* Each of these properties are expressed as factor such that the properties factor defines the relative
* importance of the property for the weight function. For example if the weight function should calculate
* the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will
* in turn have no effect on the distribution.
* </p>
* The weight per index is calculated based on the following formula:
* <ul>
* <li>
* <code>weight<sub>index</sub>(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index))</code>
* </li>
* <li>
* <code>weight<sub>node</sub>(node, index) = shardBalance * (node.numShards() - avgShardsPerNode)</code>
* </li>
* </ul>
* <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code>
*/
private static class WeightFunction {

private final float theta0;
private final float theta1;
private final float theta2;
private final float theta3;

WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) {
float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
theta2 = writeLoadBalance / sum;
theta3 = diskUsageBalance / sum;
}

float weight(Balancer balancer, ModelNode node, String index) {
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return nodeWeight(balancer, node) + theta1 * weightIndex;
}

float nodeWeight(Balancer balancer, ModelNode node) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode());
final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode());
return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage;
}

float minWeightDelta(Balancer balancer, String index) {
return theta0 * 1 + theta1 * 1 + theta2 * balancer.getShardWriteLoad(index) + theta3 * balancer.maxShardSizeBytes(index);
}
}

/**
* A {@link Balancer}
*/
Expand All @@ -335,63 +279,13 @@ private Balancer(WriteLoadForecaster writeLoadForecaster, RoutingAllocation allo
this.metadata = allocation.metadata();
this.weight = weight;
this.threshold = threshold;
avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size();
avgWriteLoadPerNode = getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size();
avgDiskUsageInBytesPerNode = ((double) getTotalDiskUsageInBytes(allocation.clusterInfo(), metadata) / routingNodes.size());
avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes);
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
sorter = newNodeSorter();
}

private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) {
double writeLoad = 0.0;
for (IndexMetadata indexMetadata : metadata.indices().values()) {
writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata);
}
return writeLoad;
}

private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) {
var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
return shardWriteLoad * numberOfCopies(indexMetadata);
}

private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) {
long totalDiskUsageInBytes = 0;
for (IndexMetadata indexMetadata : metadata.indices().values()) {
totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata);
}
return totalDiskUsageInBytes;
}

// Visible for testing
static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) {
if (indexMetadata.ignoreDiskWatermarks()) {
// disk watermarks are ignored for partial searchable snapshots
// and is equivalent to indexMetadata.isPartialSearchableSnapshot()
return 0;
}
final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L);
long totalSizeInBytes = 0;
int shardCount = 0;
for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) {
final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard);
final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L));
if (primaryShardSize != -1L) {
totalSizeInBytes += primaryShardSize;
shardCount++;
}
final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L));
if (replicaShardSize != -1L) {
totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas();
shardCount += indexMetadata.getNumberOfReplicas();
}
}
if (shardCount == numberOfCopies(indexMetadata)) {
return totalSizeInBytes;
}
return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata);
}

private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) {
if (indexMetadata.ignoreDiskWatermarks()) {
// disk watermarks are ignored for partial searchable snapshots
Expand All @@ -401,10 +295,6 @@ private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMet
return Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0L), clusterInfo.getShardSize(shardRouting, 0L));
}

private static int numberOfCopies(IndexMetadata indexMetadata) {
return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas());
}

private float getShardWriteLoad(String index) {
return (float) writeLoadForecaster.getForecastedWriteLoad(metadata.index(index)).orElse(0.0);
}
Expand Down Expand Up @@ -1433,7 +1323,7 @@ public float weight(ModelNode node) {
}

public float minWeightDelta() {
return function.minWeightDelta(balancer, index);
return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
* @param weightsPerNode The node weights calculated based on
* {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WeightFunction#nodeWeight}
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#nodeWeight}
*/
public record DesiredBalance(
long lastConvergedIndex,
Expand Down
Loading

0 comments on commit c031d01

Please sign in to comment.