From e9f899ee6913fe00dc8ef7a4254c76e8dca31b47 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Tue, 26 Nov 2024 16:44:15 +0100 Subject: [PATCH 1/5] Add current node weight as an APM metric (#117557) As discussed previously, the current node weight (calculated the same way that we calculate for the desired balance computations) might also be useful to have as a metric. The difference is that the current node weight is calculated based on the current cluster state rather than the internal state of the BalancedShardsAllocator (i.e. Balancer and ModelNode). To share all the weight calculation logic I had to move out the weight function and a few related utilities. NodeAllocationStatsProvider is still shared by both the AllocationStatsService and the desired balance metric collection. Relates ES-10080 --- .../DesiredBalanceReconcilerMetricsIT.java | 10 ++ .../elasticsearch/cluster/ClusterModule.java | 2 +- .../allocation/AllocationStatsService.java | 23 ++- .../NodeAllocationStatsProvider.java | 61 ++++++- .../allocator/BalancedShardsAllocator.java | 136 ++------------- .../allocation/allocator/DesiredBalance.java | 2 +- .../allocator/DesiredBalanceMetrics.java | 26 ++- .../allocator/DesiredBalanceReconciler.java | 11 +- .../allocation/allocator/WeightFunction.java | 157 ++++++++++++++++++ .../AllocationStatsServiceTests.java | 6 +- .../BalancedShardsAllocatorTests.java | 2 +- .../cluster/ESAllocationTestCase.java | 10 +- 12 files changed, 297 insertions(+), 149 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java index b3ec4a5331180..355427c4e059b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java @@ -117,6 +117,15 @@ public void testDesiredBalanceMetrics() { assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds))); assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames))); } + final var currentNodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement( + DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME + ); + assertThat(currentNodeWeightsMetrics.size(), equalTo(2)); + for (var nodeStat : currentNodeWeightsMetrics) { + assertTrue(nodeStat.isDouble()); + assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds))); + assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames))); + } final var currentNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement( DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME ); @@ -196,6 +205,7 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME), matcher ); + assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher); assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 046f4b6b0b251..c2da33f8f4135 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -139,7 +139,7 @@ public ClusterModule( this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); this.allocationDeciders = new AllocationDeciders(deciderList); - var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster); + var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster, clusterService.getClusterSettings()); this.shardsAllocator = createShardsAllocator( settings, clusterService.getClusterSettings(), diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java index 0c82faaaeaa45..b98e9050d2b4a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.function.Supplier; +import java.util.stream.Collectors; public class AllocationStatsService { private final ClusterService clusterService; @@ -39,6 +40,26 @@ public AllocationStatsService( } public Map stats() { - return nodeAllocationStatsProvider.stats(clusterService.state(), clusterInfoService.getClusterInfo(), desiredBalanceSupplier.get()); + var state = clusterService.state(); + var stats = nodeAllocationStatsProvider.stats( + state.metadata(), + state.getRoutingNodes(), + clusterInfoService.getClusterInfo(), + desiredBalanceSupplier.get() + ); + return stats.entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> new NodeAllocationStats( + e.getValue().shards(), + e.getValue().undesiredShards(), + e.getValue().forecastedIngestLoad(), + e.getValue().forecastedDiskUsage(), + e.getValue().currentDiskUsage() + ) + ) + ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java index 157b409be14d3..8368f5916ef91 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java @@ -10,11 +10,15 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; +import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; @@ -23,17 +27,47 @@ public class NodeAllocationStatsProvider { private final WriteLoadForecaster writeLoadForecaster; - public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster) { + private volatile float indexBalanceFactor; + private volatile float shardBalanceFactor; + private volatile float writeLoadBalanceFactor; + private volatile float diskUsageBalanceFactor; + + public record NodeAllocationAndClusterBalanceStats( + int shards, + int undesiredShards, + double forecastedIngestLoad, + long forecastedDiskUsage, + long currentDiskUsage, + float currentNodeWeight + ) {} + + public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) { this.writeLoadForecaster = writeLoadForecaster; + clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value); + clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value); + clusterSettings.initializeAndWatch( + BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING, + value -> this.writeLoadBalanceFactor = value + ); + clusterSettings.initializeAndWatch( + BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, + value -> this.diskUsageBalanceFactor = value + ); } - public Map stats( - ClusterState clusterState, + public Map stats( + Metadata metadata, + RoutingNodes routingNodes, ClusterInfo clusterInfo, @Nullable DesiredBalance desiredBalance ) { - var stats = Maps.newMapWithExpectedSize(clusterState.getRoutingNodes().size()); - for (RoutingNode node : clusterState.getRoutingNodes()) { + var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor); + var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); + var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); + var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes); + + var stats = Maps.newMapWithExpectedSize(routingNodes.size()); + for (RoutingNode node : routingNodes) { int shards = 0; int undesiredShards = 0; double forecastedWriteLoad = 0.0; @@ -44,7 +78,7 @@ public Map stats( continue; } shards++; - IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(shardRouting.index()); + IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index()); if (isDesiredAllocation(desiredBalance, shardRouting) == false) { undesiredShards++; } @@ -54,14 +88,23 @@ public Map stats( currentDiskUsage += shardSize; } + float currentNodeWeight = weightFunction.nodeWeight( + shards, + avgShardsPerNode, + forecastedWriteLoad, + avgWriteLoadPerNode, + currentDiskUsage, + avgDiskUsageInBytesPerNode + ); stats.put( node.nodeId(), - new NodeAllocationStats( + new NodeAllocationAndClusterBalanceStats( shards, desiredBalance != null ? undesiredShards : -1, forecastedWriteLoad, forecastedDiskUsage, - currentDiskUsage + currentDiskUsage, + currentNodeWeight ) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 5b8fb0c7e9203..8dd1f14564ce9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -168,14 +168,17 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w Map nodeLevelWeights = new HashMap<>(); for (var entry : balancer.nodes.entrySet()) { var node = entry.getValue(); + var nodeWeight = weightFunction.nodeWeight( + node.numShards(), + balancer.avgShardsPerNode(), + node.writeLoad(), + balancer.avgWriteLoadPerNode(), + node.diskUsageInBytes(), + balancer.avgDiskUsageInBytesPerNode() + ); nodeLevelWeights.put( node.routingNode.node(), - new DesiredBalanceMetrics.NodeWeightStats( - node.numShards(), - node.diskUsageInBytes(), - node.writeLoad(), - weightFunction.nodeWeight(balancer, node) - ) + new DesiredBalanceMetrics.NodeWeightStats(node.numShards(), node.diskUsageInBytes(), node.writeLoad(), nodeWeight) ); } allocation.routingNodes().setBalanceWeightStatsPerNode(nodeLevelWeights); @@ -252,65 +255,6 @@ public float getShardBalance() { return shardBalanceFactor; } - /** - * This class is the primary weight function used to create balanced over nodes and shards in the cluster. - * Currently this function has 3 properties: - *
    - *
  • index balance - balance property over shards per index
  • - *
  • shard balance - balance property over shards per cluster
  • - *
- *

- * Each of these properties are expressed as factor such that the properties factor defines the relative - * importance of the property for the weight function. For example if the weight function should calculate - * the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will - * in turn have no effect on the distribution. - *

- * The weight per index is calculated based on the following formula: - *
    - *
  • - * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) - *
  • - *
  • - * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) - *
  • - *
- * weight(node, index) = weightindex(node, index) + weightnode(node, index) - */ - private static class WeightFunction { - - private final float theta0; - private final float theta1; - private final float theta2; - private final float theta3; - - WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) { - float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance; - if (sum <= 0.0f) { - throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); - } - theta0 = shardBalance / sum; - theta1 = indexBalance / sum; - theta2 = writeLoadBalance / sum; - theta3 = diskUsageBalance / sum; - } - - float weight(Balancer balancer, ModelNode node, String index) { - final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); - return nodeWeight(balancer, node) + theta1 * weightIndex; - } - - float nodeWeight(Balancer balancer, ModelNode node) { - final float weightShard = node.numShards() - balancer.avgShardsPerNode(); - final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode()); - final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode()); - return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage; - } - - float minWeightDelta(Balancer balancer, String index) { - return theta0 * 1 + theta1 * 1 + theta2 * balancer.getShardWriteLoad(index) + theta3 * balancer.maxShardSizeBytes(index); - } - } - /** * A {@link Balancer} */ @@ -335,63 +279,13 @@ private Balancer(WriteLoadForecaster writeLoadForecaster, RoutingAllocation allo this.metadata = allocation.metadata(); this.weight = weight; this.threshold = threshold; - avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); - avgWriteLoadPerNode = getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size(); - avgDiskUsageInBytesPerNode = ((double) getTotalDiskUsageInBytes(allocation.clusterInfo(), metadata) / routingNodes.size()); + avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); + avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); + avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); } - private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) { - double writeLoad = 0.0; - for (IndexMetadata indexMetadata : metadata.indices().values()) { - writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata); - } - return writeLoad; - } - - private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) { - var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0); - return shardWriteLoad * numberOfCopies(indexMetadata); - } - - private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) { - long totalDiskUsageInBytes = 0; - for (IndexMetadata indexMetadata : metadata.indices().values()) { - totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata); - } - return totalDiskUsageInBytes; - } - - // Visible for testing - static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) { - if (indexMetadata.ignoreDiskWatermarks()) { - // disk watermarks are ignored for partial searchable snapshots - // and is equivalent to indexMetadata.isPartialSearchableSnapshot() - return 0; - } - final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L); - long totalSizeInBytes = 0; - int shardCount = 0; - for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { - final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard); - final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L)); - if (primaryShardSize != -1L) { - totalSizeInBytes += primaryShardSize; - shardCount++; - } - final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L)); - if (replicaShardSize != -1L) { - totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas(); - shardCount += indexMetadata.getNumberOfReplicas(); - } - } - if (shardCount == numberOfCopies(indexMetadata)) { - return totalSizeInBytes; - } - return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata); - } - private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { if (indexMetadata.ignoreDiskWatermarks()) { // disk watermarks are ignored for partial searchable snapshots @@ -401,10 +295,6 @@ private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMet return Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0L), clusterInfo.getShardSize(shardRouting, 0L)); } - private static int numberOfCopies(IndexMetadata indexMetadata) { - return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas()); - } - private float getShardWriteLoad(String index) { return (float) writeLoadForecaster.getForecastedWriteLoad(metadata.index(index)).orElse(0.0); } @@ -1433,7 +1323,7 @@ public float weight(ModelNode node) { } public float minWeightDelta() { - return function.minWeightDelta(balancer, index); + return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index)); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java index 9de95804b49b2..6ad44fdf3a9c0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java @@ -21,7 +21,7 @@ * * @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated * @param weightsPerNode The node weights calculated based on - * {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WeightFunction#nodeWeight} + * {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#nodeWeight} */ public record DesiredBalance( long lastConvergedIndex, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index cf8840dc95724..9f6487bdc8abd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -10,7 +10,7 @@ package org.elasticsearch.cluster.routing.allocation.allocator; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; +import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider.NodeAllocationAndClusterBalanceStats; import org.elasticsearch.telemetry.metric.DoubleWithAttributes; import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.telemetry.metric.MeterRegistry; @@ -41,6 +41,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String DESIRED_BALANCE_NODE_DISK_USAGE_METRIC_NAME = "es.allocator.desired_balance.allocations.node_disk_usage_bytes.current"; + public static final String CURRENT_NODE_WEIGHT_METRIC_NAME = "es.allocator.allocations.node.weight.current"; public static final String CURRENT_NODE_SHARD_COUNT_METRIC_NAME = "es.allocator.allocations.node.shard_count.current"; public static final String CURRENT_NODE_WRITE_LOAD_METRIC_NAME = "es.allocator.allocations.node.write_load.current"; public static final String CURRENT_NODE_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.disk_usage_bytes.current"; @@ -68,12 +69,13 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w private volatile long undesiredAllocations; private final AtomicReference> weightStatsPerNodeRef = new AtomicReference<>(Map.of()); - private final AtomicReference> allocationStatsPerNodeRef = new AtomicReference<>(Map.of()); + private final AtomicReference> allocationStatsPerNodeRef = + new AtomicReference<>(Map.of()); public void updateMetrics( AllocationStats allocationStats, Map weightStatsPerNode, - Map nodeAllocationStats + Map nodeAllocationStats ) { assert allocationStats != null : "allocation stats cannot be null"; assert weightStatsPerNode != null : "node balance weight stats cannot be null"; @@ -124,6 +126,12 @@ public DesiredBalanceMetrics(MeterRegistry meterRegistry) { "bytes", this::getDesiredBalanceNodeDiskUsageMetrics ); + meterRegistry.registerDoublesGauge( + CURRENT_NODE_WEIGHT_METRIC_NAME, + "The weight of nodes based on the current allocation state", + "unit", + this::getCurrentNodeWeightMetrics + ); meterRegistry.registerLongsGauge( DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME, "Shard count of nodes in the computed desired balance", @@ -291,6 +299,18 @@ private List getCurrentNodeUndesiredShardCountMetrics() { return values; } + private List getCurrentNodeWeightMetrics() { + if (nodeIsMaster == false) { + return List.of(); + } + var stats = allocationStatsPerNodeRef.get(); + List doubles = new ArrayList<>(stats.size()); + for (var node : stats.keySet()) { + doubles.add(new DoubleWithAttributes(stats.get(node).currentNodeWeight(), getNodeAttributes(node))); + } + return doubles; + } + private Map getNodeAttributes(DiscoveryNode node) { return Map.of("node_id", node.getId(), "node_name", node.getName()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 5ad29debc8f20..2ee905634f760 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -20,8 +20,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider; +import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider.NodeAllocationAndClusterBalanceStats; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -159,8 +159,13 @@ void run() { } private void updateDesireBalanceMetrics(AllocationStats allocationStats) { - var stats = nodeAllocationStatsProvider.stats(allocation.getClusterState(), allocation.clusterInfo(), desiredBalance); - Map nodeAllocationStats = new HashMap<>(stats.size()); + var stats = nodeAllocationStatsProvider.stats( + allocation.metadata(), + allocation.routingNodes(), + allocation.clusterInfo(), + desiredBalance + ); + Map nodeAllocationStats = new HashMap<>(stats.size()); for (var entry : stats.entrySet()) { var node = allocation.nodes().get(entry.getKey()); if (node != null) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java new file mode 100644 index 0000000000000..7203a92b147f6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.index.shard.ShardId; + +/** + * This class is the primary weight function used to create balanced over nodes and shards in the cluster. + * Currently this function has 3 properties: + *
    + *
  • index balance - balance property over shards per index
  • + *
  • shard balance - balance property over shards per cluster
  • + *
+ *

+ * Each of these properties are expressed as factor such that the properties factor defines the relative + * importance of the property for the weight function. For example if the weight function should calculate + * the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will + * in turn have no effect on the distribution. + *

+ * The weight per index is calculated based on the following formula: + *
    + *
  • + * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) + *
  • + *
  • + * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) + *
  • + *
+ * weight(node, index) = weightindex(node, index) + weightnode(node, index) + */ +public class WeightFunction { + + private final float theta0; + private final float theta1; + private final float theta2; + private final float theta3; + + public WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) { + float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance; + if (sum <= 0.0f) { + throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); + } + theta0 = shardBalance / sum; + theta1 = indexBalance / sum; + theta2 = writeLoadBalance / sum; + theta3 = diskUsageBalance / sum; + } + + float weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); + final float nodeWeight = nodeWeight( + node.numShards(), + balancer.avgShardsPerNode(), + node.writeLoad(), + balancer.avgWriteLoadPerNode(), + node.diskUsageInBytes(), + balancer.avgDiskUsageInBytesPerNode() + ); + return nodeWeight + theta1 * weightIndex; + } + + public float nodeWeight( + int nodeNumShards, + float avgShardsPerNode, + double nodeWriteLoad, + double avgWriteLoadPerNode, + double diskUsageInBytes, + double avgDiskUsageInBytesPerNode + ) { + final float weightShard = nodeNumShards - avgShardsPerNode; + final float ingestLoad = (float) (nodeWriteLoad - avgWriteLoadPerNode); + final float diskUsage = (float) (diskUsageInBytes - avgDiskUsageInBytesPerNode); + return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage; + } + + float minWeightDelta(float shardWriteLoad, float shardSizeBytes) { + return theta0 * 1 + theta1 * 1 + theta2 * shardWriteLoad + theta3 * shardSizeBytes; + } + + public static float avgShardPerNode(Metadata metadata, RoutingNodes routingNodes) { + return ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); + } + + public static double avgWriteLoadPerNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, RoutingNodes routingNodes) { + return getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size(); + } + + public static double avgDiskUsageInBytesPerNode(ClusterInfo clusterInfo, Metadata metadata, RoutingNodes routingNodes) { + return ((double) getTotalDiskUsageInBytes(clusterInfo, metadata) / routingNodes.size()); + } + + private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) { + double writeLoad = 0.0; + for (IndexMetadata indexMetadata : metadata.indices().values()) { + writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata); + } + return writeLoad; + } + + private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) { + var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0); + return shardWriteLoad * numberOfCopies(indexMetadata); + } + + private static int numberOfCopies(IndexMetadata indexMetadata) { + return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas()); + } + + private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) { + long totalDiskUsageInBytes = 0; + for (IndexMetadata indexMetadata : metadata.indices().values()) { + totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata); + } + return totalDiskUsageInBytes; + } + + // Visible for testing + static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) { + if (indexMetadata.ignoreDiskWatermarks()) { + // disk watermarks are ignored for partial searchable snapshots + // and is equivalent to indexMetadata.isPartialSearchableSnapshot() + return 0; + } + final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L); + long totalSizeInBytes = 0; + int shardCount = 0; + for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { + final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard); + final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L)); + if (primaryShardSize != -1L) { + totalSizeInBytes += primaryShardSize; + shardCount++; + } + final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L)); + if (replicaShardSize != -1L) { + totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas(); + shardCount += indexMetadata.getNumberOfReplicas(); + } + } + if (shardCount == numberOfCopies(indexMetadata)) { + return totalSizeInBytes; + } + return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 0efa576a0cddc..35f1780464659 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -84,7 +84,7 @@ public void testShardStats() { clusterService, () -> clusterInfo, createShardAllocator(), - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), @@ -125,7 +125,7 @@ public void testRelocatingShardIsOnlyCountedOnceOnTargetNode() { clusterService, EmptyClusterInfoService.INSTANCE, createShardAllocator(), - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), @@ -182,7 +182,7 @@ public DesiredBalance getDesiredBalance() { ); } }, - new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER) + new NodeAllocationStatsProvider(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) ); assertThat( service.stats(), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 98c3451329f52..412329e51a485 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -59,8 +59,8 @@ import static java.util.stream.Collectors.toSet; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; -import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.getIndexDiskUsageInBytes; import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction.getIndexDiskUsageInBytes; import static org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index a041efc9ad3f1..75cd6da44724d 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -19,12 +19,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodesHelper; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; @@ -438,11 +438,13 @@ public void allocateUnassigned( } protected static final NodeAllocationStatsProvider EMPTY_NODE_ALLOCATION_STATS = new NodeAllocationStatsProvider( - WriteLoadForecaster.DEFAULT + WriteLoadForecaster.DEFAULT, + createBuiltInClusterSettings() ) { @Override - public Map stats( - ClusterState clusterState, + public Map stats( + Metadata metadata, + RoutingNodes routingNodes, ClusterInfo clusterInfo, @Nullable DesiredBalance desiredBalance ) { From bfe1aad78044d7adc864ad647e88462f8cdce150 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Tue, 26 Nov 2024 16:47:25 +0100 Subject: [PATCH 2/5] Cleanup BucketsAggregator#rewriteBuckets (#114574) The array is initialized with the flag clearOnResize set to true so we don't need to set the values to 0 again. --- .../search/aggregations/bucket/BucketsAggregator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index ea667b821a7dd..665dd49e3381d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -105,7 +105,6 @@ public final void rewriteBuckets(long newNumBuckets, LongUnaryOperator mergeMap) try { docCounts = bigArrays().newLongArray(newNumBuckets, true); success = true; - docCounts.fill(0, newNumBuckets, 0); for (long i = 0; i < oldDocCounts.size(); i++) { long docCount = oldDocCounts.get(i); From 505c54eb94c71b694d44b8cf424be7ab5894e2e5 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Tue, 26 Nov 2024 16:59:54 +0100 Subject: [PATCH 3/5] Use feature flags in OperatorPrivilegesIT (#117491) Release runs fail for this suite because some of the actions listed are still behind a feature flag. Closes: https://github.com/elastic/elasticsearch/issues/102992 --- muted-tests.yml | 3 --- .../elasticsearch/xpack/security/operator/Constants.java | 8 +++++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 49898308e411b..1f092de410f8e 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -226,9 +226,6 @@ tests: - class: org.elasticsearch.xpack.inference.InferenceRestIT method: test {p0=inference/30_semantic_text_inference/Calculates embeddings using the default ELSER 2 endpoint} issue: https://github.com/elastic/elasticsearch/issues/117349 -- class: org.elasticsearch.xpack.security.operator.OperatorPrivilegesIT - method: testEveryActionIsEitherOperatorOnlyOrNonOperator - issue: https://github.com/elastic/elasticsearch/issues/102992 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_reset/Test reset running transform} issue: https://github.com/elastic/elasticsearch/issues/117473 diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index bfff63442281d..8df10037affdb 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.security.operator; +import org.elasticsearch.cluster.metadata.DataStream; + import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -508,9 +510,9 @@ public class Constants { "indices:admin/data_stream/lifecycle/get", "indices:admin/data_stream/lifecycle/put", "indices:admin/data_stream/lifecycle/explain", - "indices:admin/data_stream/options/delete", - "indices:admin/data_stream/options/get", - "indices:admin/data_stream/options/put", + DataStream.isFailureStoreFeatureFlagEnabled() ? "indices:admin/data_stream/options/delete" : null, + DataStream.isFailureStoreFeatureFlagEnabled() ? "indices:admin/data_stream/options/get" : null, + DataStream.isFailureStoreFeatureFlagEnabled() ? "indices:admin/data_stream/options/put" : null, "indices:admin/delete", "indices:admin/flush", "indices:admin/flush[s]", From f57c43cdf5ce8188cc66042b1a8adee420e91825 Mon Sep 17 00:00:00 2001 From: Oleksandr Kolomiiets Date: Tue, 26 Nov 2024 08:09:30 -0800 Subject: [PATCH 4/5] Include a link to downsampling a TSDS using DSL document (#117510) --- docs/reference/data-streams/tsds.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/reference/data-streams/tsds.asciidoc b/docs/reference/data-streams/tsds.asciidoc index 461c0a1272e96..d0d6d4a455c63 100644 --- a/docs/reference/data-streams/tsds.asciidoc +++ b/docs/reference/data-streams/tsds.asciidoc @@ -339,4 +339,5 @@ include::tsds-index-settings.asciidoc[] include::downsampling.asciidoc[] include::downsampling-ilm.asciidoc[] include::downsampling-manual.asciidoc[] +include::downsampling-dsl.asciidoc[] include::tsds-reindex.asciidoc[] From b22d185b7fca8147ec1cfcd993d7c803ce5a240e Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Tue, 26 Nov 2024 17:46:40 +0100 Subject: [PATCH 5/5] ES|QL: fix stats by constant expresson with alias (#117551) --- docs/changelog/117551.yaml | 5 + .../src/main/resources/stats.csv-spec | 12 ++ .../xpack/esql/action/EsqlCapabilities.java | 7 +- .../xpack/esql/session/EsqlSession.java | 2 +- .../session/IndexResolverFieldNamesTests.java | 108 ++++++++++++++++++ 5 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/117551.yaml diff --git a/docs/changelog/117551.yaml b/docs/changelog/117551.yaml new file mode 100644 index 0000000000000..081dd9203d82a --- /dev/null +++ b/docs/changelog/117551.yaml @@ -0,0 +1,5 @@ +pr: 117551 +summary: Fix stats by constant expresson with alias +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec index 5562028a5935f..f95506ff1982f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec @@ -2778,6 +2778,18 @@ m:integer | y+1:integer 11 | 12 ; +statsByConstantExpressionWithAliasAndSort +required_capability: fix_stats_by_foldable_expression_2 +FROM employees +| EVAL y = "a" +| STATS count = COUNT() BY x = y +| SORT x +; + +count:long | x:keyword +100 | a +; + filterIsAlwaysTrue required_capability: per_agg_filtering FROM employees diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 08fa7f0a9b213..3eaeceaa86564 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -531,7 +531,12 @@ public enum Cap { /** * support for aggregations on semantic_text */ - SEMANTIC_TEXT_AGGREGATIONS(EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG); + SEMANTIC_TEXT_AGGREGATIONS(EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG), + + /** + * Fix for https://github.com/elastic/elasticsearch/issues/114714, again + */ + FIX_STATS_BY_FOLDABLE_EXPRESSION_2,; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 25bb6d80d0dd0..8f65914d1c30d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -511,7 +511,7 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF // remove any already discovered UnresolvedAttributes that are in fact aliases defined later down in the tree // for example "from test | eval x = salary | stats max = max(x) by gender" // remove the UnresolvedAttribute "x", since that is an Alias defined in "eval" - AttributeSet planRefs = Expressions.references(p.expressions()); + AttributeSet planRefs = p.references(); p.forEachExpressionDown(Alias.class, alias -> { // do not remove the UnresolvedAttribute that has the same name as its alias, ie "rename id = id" // or the UnresolvedAttributes that are used in Functions that have aliases "STATS id = MAX(id)" diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java index 5425f770c49e8..0fe89b24dfc6a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java @@ -353,6 +353,114 @@ public void testDocsStats() { | SORT languages""", Set.of("emp_no", "emp_no.*", "languages", "languages.*")); } + public void testEvalStats() { + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY y""", Set.of("_index")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY y + | SORT y""", Set.of("_index")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY x = y + | SORT x""", Set.of("_index")); + + assertFieldNames(""" + FROM employees + | STATS count = COUNT(*) BY first_name + | SORT first_name""", Set.of("first_name", "first_name.*")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY x = y + | SORT x, first_name""", Set.of("first_name", "first_name.*")); + + assertFieldNames(""" + FROM employees + | EVAL first_name = "a" + | STATS count = COUNT(*) BY first_name + | SORT first_name""", Set.of("_index")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY first_name = to_upper(y) + | SORT first_name""", Set.of("_index")); + + assertFieldNames(""" + FROM employees + | EVAL y = to_upper(first_name), z = "z" + | STATS count = COUNT(*) BY first_name = to_lower(y), z + | SORT first_name""", Set.of("first_name", "first_name.*")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY x = y, z = first_name + | SORT x, z""", Set.of("first_name", "first_name.*")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY x = y, first_name + | SORT x, first_name""", Set.of("first_name", "first_name.*")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(first_name) BY x = y + | SORT x + | DROP first_name""", Set.of("first_name", "first_name.*")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY x = y + | MV_EXPAND x""", Set.of("_index")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY first_name, y + | MV_EXPAND first_name""", Set.of("first_name", "first_name.*")); + + assertFieldNames(""" + FROM employees + | MV_EXPAND first_name + | EVAL y = "a" + | STATS count = COUNT(*) BY first_name, y + | SORT y""", Set.of("first_name", "first_name.*")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | MV_EXPAND y + | STATS count = COUNT(*) BY x = y + | SORT x""", Set.of("_index")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY x = y + | STATS count = COUNT(count) by x + | SORT x""", Set.of("_index")); + + assertFieldNames(""" + FROM employees + | EVAL y = "a" + | STATS count = COUNT(*) BY first_name, y + | STATS count = COUNT(count) by x = y + | SORT x""", Set.of("first_name", "first_name.*")); + } + public void testSortWithLimitOne_DropHeight() { assertFieldNames("from employees | sort languages | limit 1 | drop height*", ALL_FIELDS); }