From 61ba7b540db03630e2d9cbd17d46b52df67addba Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 22 Feb 2023 19:12:27 +0000 Subject: [PATCH] [Segment Replication] Revert backport/backport 6017 to 2.x (#6440) * Revert "[Segment Replication] Add changelog entry and fix allocation test (#6250) (#6270)" This reverts commit 726c068f3c8ec3e91b72f3ae71946fe7f33f3c35. * Revert "[Segment Replication] Introduce primary weight factor for primary shards distribution (#6017) (#6161)" This reverts commit 2e3e1ed7bb4e2fe6239a14fed8450931e90af45d. (cherry picked from commit e316959bc7f11e2f28d441293667f1caa6658f1e) Signed-off-by: github-actions[bot] --- CHANGELOG.md | 2 +- .../opensearch/common/collect/Triplet.java | 63 ---- .../SegmentReplicationAllocationIT.java | 307 ------------------ .../cluster/routing/RoutingNode.java | 17 - .../allocator/BalancedShardsAllocator.java | 67 +--- .../allocator/LocalShardsBalancer.java | 8 - .../allocation/allocator/ShardsBalancer.java | 11 +- .../org/opensearch/common/TriConsumer.java | 13 +- .../settings/AbstractScopedSettings.java | 23 -- .../common/settings/ClusterSettings.java | 1 - .../opensearch/common/settings/Setting.java | 56 ---- .../support/MultiTermsValuesSourceConfig.java | 2 +- .../support/MultiValuesSourceFieldConfig.java | 2 +- .../allocation/BalanceConfigurationTests.java | 177 +--------- .../common/settings/SettingTests.java | 127 -------- .../metrics/SumAggregatorTests.java | 2 +- 16 files changed, 25 insertions(+), 853 deletions(-) delete mode 100644 libs/common/src/main/java/org/opensearch/common/collect/Triplet.java delete mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 3abfa6d04fa1c..19a76be5e8b00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,12 +18,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add a guardrail to limit maximum number of shard on the cluster ([#6143](https://github.com/opensearch-project/OpenSearch/pull/6143)) - Add cancellation of in-flight SearchTasks based on resource consumption ([#5606](https://github.com/opensearch-project/OpenSearch/pull/5605)) - Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459)) -- [Segment Replication] Add primary weight factor for balanced primary distribution ([#6017](https://github.com/opensearch-project/OpenSearch/pull/6017)) - Add a setting to control auto release of OpenSearch managed index creation block ([#6277](https://github.com/opensearch-project/OpenSearch/pull/6277)) - Fix timeout error when adding a document to an index with extension running ([#6275](https://github.com/opensearch-project/OpenSearch/pull/6275)) - Handle translog upload during primary relocation for remote-backed indexes ([#5804](https://github.com/opensearch-project/OpenSearch/pull/5804)) - Batch translog sync/upload per x ms for remote-backed indexes ([#5854](https://github.com/opensearch-project/OpenSearch/pull/5854)) + ### Dependencies - Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704)) - Bumps `reactor-netty` from 1.1.1 to 1.1.2 ([#5878](https://github.com/opensearch-project/OpenSearch/pull/5878)) diff --git a/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java b/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java deleted file mode 100644 index 9b7e8f6a419a6..0000000000000 --- a/libs/common/src/main/java/org/opensearch/common/collect/Triplet.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.common.collect; - -import java.util.Objects; - -/** - * A container for 3 elements, similar to {@link org.opensearch.common.collect.Tuple} - * - * @opensearch.internal - */ -public class Triplet { - - public static Triplet tuple(V1 v1, V2 v2, V3 v3) { - return new Triplet<>(v1, v2, v3); - } - - private final V1 v1; - private final V2 v2; - - private final V3 v3; - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Triplet triplet = (Triplet) o; - return Objects.equals(v1, triplet.v1) && Objects.equals(v2, triplet.v2) && Objects.equals(v3, triplet.v3); - } - - @Override - public int hashCode() { - return Objects.hash(v1, v2, v3); - } - - public Triplet(V1 v1, V2 v2, V3 v3) { - this.v1 = v1; - this.v2 = v2; - this.v3 = v3; - } - - public V1 v1() { - return v1; - } - - public V2 v2() { - return v2; - } - - public V3 v3() { - return v3; - } - - @Override - public String toString() { - return "Tuple [v1=" + v1 + ", v2=" + v2 + ", v3=" + v3 + "]"; - } -} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java deleted file mode 100644 index fb72c4d578510..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.replication; - -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingNodes; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.IndexModule; -import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.InternalTestCluster; -import org.opensearch.test.OpenSearchIntegTestCase; - -import java.util.ArrayList; -import java.util.Formatter; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; - -import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SegmentReplicationAllocationIT extends SegmentReplicationBaseIT { - - private void createIndex(String idxName, int shardCount, int replicaCount, boolean isSegRep) { - Settings.Builder builder = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount); - if (isSegRep) { - builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); - } else { - builder = builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT); - } - prepareCreate(idxName, builder).get(); - } - - /** - * This test verifies primary shard allocation is balanced. - */ - public void testShardAllocation() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final int maxReplicaCount = 2; - final int maxShardCount = 5; - final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); - final int numberOfIndices = randomIntBetween(5, 10); - - final List nodeNames = new ArrayList<>(); - logger.info("--> Creating {} nodes", nodeCount); - for (int i = 0; i < nodeCount; i++) { - nodeNames.add(internalCluster().startNode()); - } - assertAcked( - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings( - Settings.builder().put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f") - ) - ); - - int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; - ShardAllocations shardAllocations = new ShardAllocations(); - ClusterState state; - for (int i = 0; i < numberOfIndices; i++) { - shardCount = randomIntBetween(1, maxShardCount); - totalShardCount += shardCount; - replicaCount = randomIntBetween(0, maxReplicaCount); - totalReplicaCount += replicaCount; - createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); - logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount); - assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - shardAllocations.printShardDistribution(state); - } - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - RoutingNodes nodes = state.getRoutingNodes(); - final float avgNumShards = (float) (totalShardCount) / (float) (nodes.size()); - final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); - final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); - - for (RoutingNode node : nodes) { - assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); - assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); - } - } - - /** - * This test verifies shard allocation with changes to cluster config i.e. node add, removal keeps the primary shard - * allocation balanced. - */ - public void testAllocationWithDisruption() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final int maxReplicaCount = 2; - final int maxShardCount = 5; - final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10); - final int numberOfIndices = randomIntBetween(1, 10); - - logger.info("--> Creating {} nodes", nodeCount); - final List nodeNames = new ArrayList<>(); - for (int i = 0; i < nodeCount; i++) { - nodeNames.add(internalCluster().startNode()); - } - assertAcked( - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings( - Settings.builder() - .put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), "1.0f") - .put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), "0.0f") - .put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), "0.0f") - .build() - ) - ); - - int shardCount, replicaCount, totalShardCount = 0, totalReplicaCount = 0; - ShardAllocations shardAllocations = new ShardAllocations(); - ClusterState state; - for (int i = 0; i < numberOfIndices; i++) { - shardCount = randomIntBetween(1, maxShardCount); - totalShardCount += shardCount; - replicaCount = randomIntBetween(1, maxReplicaCount); - totalReplicaCount += replicaCount; - logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount); - createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); - assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); - if (logger.isTraceEnabled()) { - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - shardAllocations.printShardDistribution(state); - } - } - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - float avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); - int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); - int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); - - for (RoutingNode node : state.getRoutingNodes()) { - assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); - assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); - } - - final int additionalNodeCount = randomIntBetween(1, 5); - logger.info("--> Adding {} nodes", additionalNodeCount); - - internalCluster().startNodes(additionalNodeCount); - assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); - minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); - maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); - shardAllocations.printShardDistribution(state); - for (RoutingNode node : state.getRoutingNodes()) { - assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); - assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); - } - - logger.info("--> Stop one third nodes"); - for (int i = 1; i < nodeCount; i += 3) { - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames.get(i))); - // give replica a chance to promote as primary before terminating node containing the replica - assertBusy(() -> ensureGreen(), 60, TimeUnit.SECONDS); - } - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - avgNumShards = (float) (totalShardCount) / (float) (state.getRoutingNodes().size()); - minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - 1.0f))); - maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + 1.0f))); - shardAllocations.printShardDistribution(state); - - for (RoutingNode node : state.getRoutingNodes()) { - assertTrue(node.primaryShardsWithState(STARTED).size() >= minAvgNumberOfShards); - assertTrue(node.primaryShardsWithState(STARTED).size() <= maxAvgNumberOfShards); - } - } - - /** - * This class is created for debugging purpose to show shard allocation across nodes. It keeps cluster state which - * is used to build the node's shard allocation - */ - private class ShardAllocations { - ClusterState state; - - public static final String separator = "==================================================="; - public static final String ONE_LINE_RETURN = "\n"; - public static final String TWO_LINE_RETURN = "\n\n"; - - /** - Store shard primary/replica shard count against a node for segrep indices. - String: NodeId - int[]: tuple storing primary shard count in 0th index and replica's in 1 - */ - TreeMap nodeToSegRepCountMap = new TreeMap<>(); - /** - Store shard primary/replica shard count against a node for docrep indices. - String: NodeId - int[]: tuple storing primary shard count in 0th index and replica's in 1 - */ - TreeMap nodeToDocRepCountMap = new TreeMap<>(); - - /** - * Helper map containing NodeName to NodeId - */ - TreeMap nameToNodeId = new TreeMap<>(); - - /* - Unassigned array containing primary at 0, replica at 1 - */ - int[] unassigned = new int[2]; - - int[] totalShards = new int[2]; - - public final String printShardAllocationWithHeader(int[] docrep, int[] segrep) { - StringBuffer sb = new StringBuffer(); - Formatter formatter = new Formatter(sb, Locale.getDefault()); - formatter.format("%-20s %-20s %-20s %-20s\n", "P", docrep[0] + segrep[0], docrep[0], segrep[0]); - formatter.format("%-20s %-20s %-20s %-20s\n", "R", docrep[1] + segrep[1], docrep[1], segrep[1]); - return sb.toString(); - } - - public void reset() { - nodeToSegRepCountMap.clear(); - nodeToDocRepCountMap.clear(); - nameToNodeId.clear(); - totalShards[0] = totalShards[1] = 0; - unassigned[0] = unassigned[1] = 0; - } - - public void setState(ClusterState state) { - this.reset(); - this.state = state; - buildMap(); - } - - private void buildMap() { - for (RoutingNode node : state.getRoutingNodes()) { - nameToNodeId.putIfAbsent(node.node().getName(), node.nodeId()); - nodeToSegRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); - nodeToDocRepCountMap.putIfAbsent(node.nodeId(), new int[] { 0, 0 }); - } - for (ShardRouting shardRouting : state.routingTable().allShards()) { - // Fetch shard to update. Initialize local array - if (isIndexSegRep(shardRouting.getIndexName())) { - updateMap(nodeToSegRepCountMap, shardRouting); - } else { - updateMap(nodeToDocRepCountMap, shardRouting); - } - } - } - - void updateMap(TreeMap mapToUpdate, ShardRouting shardRouting) { - int[] shard; - shard = shardRouting.assignedToNode() ? mapToUpdate.get(shardRouting.currentNodeId()) : unassigned; - // Update shard type count - if (shardRouting.primary()) { - shard[0]++; - totalShards[0]++; - } else { - shard[1]++; - totalShards[1]++; - } - // For assigned shards, put back counter - if (shardRouting.assignedToNode()) mapToUpdate.put(shardRouting.currentNodeId(), shard); - } - - boolean isIndexSegRep(String indexName) { - return state.metadata() - .index(indexName) - .getSettings() - .get(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey()) - .equals(ReplicationType.SEGMENT.toString()); - } - - @Override - public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append(TWO_LINE_RETURN + separator + ONE_LINE_RETURN); - Formatter formatter = new Formatter(sb, Locale.getDefault()); - for (Map.Entry entry : nameToNodeId.entrySet()) { - String nodeId = nameToNodeId.get(entry.getKey()); - formatter.format("%-20s %-20s %-20s %-20s\n", entry.getKey().toUpperCase(Locale.getDefault()), "TOTAL", "DOCREP", "SEGREP"); - sb.append(printShardAllocationWithHeader(nodeToDocRepCountMap.get(nodeId), nodeToSegRepCountMap.get(nodeId))); - } - sb.append(ONE_LINE_RETURN); - formatter.format("%-20s %-20s %-20s\n\n", "Unassigned ", unassigned[0], unassigned[1]); - formatter.format("%-20s %-20s %-20s\n\n", "Total Shards", totalShards[0], totalShards[1]); - return sb.toString(); - } - - public void printShardDistribution(ClusterState state) { - this.setState(state); - logger.info("--> Shard distribution {}", this); - } - } - -} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java index d23b4856eced0..413ddff72f7a5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java @@ -329,23 +329,6 @@ public List shardsWithState(ShardRoutingState... states) { return shards; } - /** - * Determine the primary shards of an index with a specific state - * @param states set of states which should be listed - * @return a list of shards - */ - public List primaryShardsWithState(ShardRoutingState... states) { - List shards = new ArrayList<>(); - for (ShardRouting shardEntry : this) { - for (ShardRoutingState state : states) { - if (shardEntry.state() == state && shardEntry.primary() == true) { - shards.add(shardEntry); - } - } - } - return shards; - } - /** * Determine the shards of an index with a specific state * @param index id of the index diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 8893aaa54799a..d8761e9b1a78e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -107,14 +107,6 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); - public static final Setting PRIMARY_SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting( - "cluster.routing.allocation.balance.primary", - 0.0f, - 0.0f, - Property.Dynamic, - Property.NodeScope - ); - private volatile boolean movePrimaryFirst; private volatile WeightFunction weightFunction; private volatile float threshold; @@ -125,19 +117,10 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { - setWeightFunction( - INDEX_BALANCE_FACTOR_SETTING.get(settings), - SHARD_BALANCE_FACTOR_SETTING.get(settings), - PRIMARY_SHARD_BALANCE_FACTOR_SETTING.get(settings) - ); + setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); - clusterSettings.addSettingsUpdateConsumer( - INDEX_BALANCE_FACTOR_SETTING, - SHARD_BALANCE_FACTOR_SETTING, - PRIMARY_SHARD_BALANCE_FACTOR_SETTING, - this::setWeightFunction - ); + clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); } @@ -145,8 +128,8 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) { this.movePrimaryFirst = movePrimaryFirst; } - private void setWeightFunction(float indexBalance, float shardBalanceFactor, float primaryShardBalance) { - weightFunction = new WeightFunction(indexBalance, shardBalanceFactor, primaryShardBalance); + private void setWeightFunction(float indexBalance, float shardBalanceFactor) { + weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); } private void setThreshold(float threshold) { @@ -269,22 +252,17 @@ static class WeightFunction { private final float shardBalance; private final float theta0; private final float theta1; - private final float theta2; - private final float primaryShardBalance; private AllocationConstraints constraints; - WeightFunction(float indexBalance, float shardBalance, float primaryShardBalance) { - float sum = indexBalance + shardBalance + primaryShardBalance; + WeightFunction(float indexBalance, float shardBalance) { + float sum = indexBalance + shardBalance; if (sum <= 0.0f) { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } theta0 = shardBalance / sum; theta1 = indexBalance / sum; - theta2 = primaryShardBalance / sum; - this.indexBalance = indexBalance; this.shardBalance = shardBalance; - this.primaryShardBalance = primaryShardBalance; this.constraints = new AllocationConstraints(); } @@ -296,9 +274,7 @@ public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode float weight(ShardsBalancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); - final float primaryWeightShard = node.numPrimaryShards() - balancer.avgPrimaryShardsPerNode(); - - return theta0 * weightShard + theta1 * weightIndex + theta2 * primaryWeightShard; + return theta0 * weightShard + theta1 * weightIndex; } } @@ -312,8 +288,6 @@ public static class ModelNode implements Iterable { private int numShards = 0; private final RoutingNode routingNode; - private int primaryNumShards = 0; - ModelNode(RoutingNode routingNode) { this.routingNode = routingNode; } @@ -339,10 +313,6 @@ public int numShards(String idx) { return index == null ? 0 : index.numShards(); } - public int numPrimaryShards() { - return primaryNumShards; - } - public int highestPrimary(String index) { ModelIndex idx = indices.get(index); if (idx != null) { @@ -359,9 +329,6 @@ public void addShard(ShardRouting shard) { } index.addShard(shard); numShards++; - if (shard.primary()) { - primaryNumShards++; - } } public void removeShard(ShardRouting shard) { @@ -372,9 +339,6 @@ public void removeShard(ShardRouting shard) { indices.remove(shard.getIndexName()); } } - if (shard.primary()) { - primaryNumShards--; - } numShards--; } @@ -417,14 +381,13 @@ public Balancer( } /** - * A model index that stores info about specific index + * A model index. * * @opensearch.internal */ static final class ModelIndex implements Iterable { private final String id; private final Set shards = new HashSet<>(4); // expect few shards of same index to be allocated on same node - private final Set primaryShards = new HashSet<>(); private int highestPrimary = -1; ModelIndex(String id) { @@ -452,10 +415,6 @@ public int numShards() { return shards.size(); } - public int numPrimaryShards() { - return primaryShards.size(); - } - @Override public Iterator iterator() { return shards.iterator(); @@ -464,20 +423,12 @@ public Iterator iterator() { public void removeShard(ShardRouting shard) { highestPrimary = -1; assert shards.contains(shard) : "Shard not allocated on current node: " + shard; - if (shard.primary()) { - assert primaryShards.contains(shard) : "Primary shard not allocated on current node: " + shard; - primaryShards.remove(shard); - } shards.remove(shard); } public void addShard(ShardRouting shard) { highestPrimary = -1; - assert shards.contains(shard) == false : "Shard already allocated on current node: " + shard; - if (shard.primary()) { - assert primaryShards.contains(shard) == false : "Primary shard already allocated on current node: " + shard; - primaryShards.add(shard); - } + assert !shards.contains(shard) : "Shard already allocated on current node: " + shard; shards.add(shard); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 738c6588357f5..3c5e4013748af 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -63,7 +63,6 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float threshold; private final Metadata metadata; private final float avgShardsPerNode; - private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; @@ -82,8 +81,6 @@ public LocalShardsBalancer( this.routingNodes = allocation.routingNodes(); this.metadata = allocation.metadata(); avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); - final int shardCount = StreamSupport.stream(metadata.spliterator(), false).mapToInt(IndexMetadata::getNumberOfShards).sum(); - avgPrimaryShardsPerNode = (float) shardCount / routingNodes.size(); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); sorter = newNodeSorter(); inEligibleTargetNode = new HashSet<>(); @@ -104,11 +101,6 @@ public float avgShardsPerNode(String index) { return ((float) metadata.index(index).getTotalNumberOfShards()) / nodes.size(); } - @Override - public float avgPrimaryShardsPerNode() { - return avgPrimaryShardsPerNode; - } - /** * Returns the global average of shards per node */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java index 9774f84a4cd91..593e6998141fb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -60,23 +60,16 @@ public abstract class ShardsBalancer { abstract MoveDecision decideRebalance(ShardRouting shardRouting); /** - * Returns the average of shards per node + * Returns the average of shards per node for the given index */ public float avgShardsPerNode() { return Float.MAX_VALUE; } /** - * Returns the global average of shards per node for the given index + * Returns the global average of shards per node */ public float avgShardsPerNode(String index) { return Float.MAX_VALUE; } - - /** - * Returns the average of primary shards per node - */ - public float avgPrimaryShardsPerNode() { - return Float.MAX_VALUE; - } } diff --git a/server/src/main/java/org/opensearch/common/TriConsumer.java b/server/src/main/java/org/opensearch/common/TriConsumer.java index a174499d58628..f98276b6d007d 100644 --- a/server/src/main/java/org/opensearch/common/TriConsumer.java +++ b/server/src/main/java/org/opensearch/common/TriConsumer.java @@ -32,8 +32,6 @@ package org.opensearch.common; -import java.util.Objects; - /** * Represents an operation that accepts three arguments and returns no result. * @@ -52,14 +50,5 @@ public interface TriConsumer { * @param t the second function argument * @param u the third function argument */ - void accept(S s, T t, U u); - - default TriConsumer andThen(TriConsumer after) { - Objects.requireNonNull(after); - - return (l, r, s) -> { - accept(l, r, s); - after.accept(l, r, s); - }; - } + void apply(S s, T t, U u); } diff --git a/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java index 50f6714ec41b9..8a19d309975df 100644 --- a/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/AbstractScopedSettings.java @@ -38,7 +38,6 @@ import org.apache.lucene.search.spell.LevenshteinDistance; import org.apache.lucene.util.CollectionUtil; import org.opensearch.ExceptionsHelper; -import org.opensearch.common.TriConsumer; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; @@ -445,28 +444,6 @@ public synchronized void addSettingsUpdateConsumer(Setting a, Setting< addSettingsUpdateConsumer(a, b, consumer, (i, j) -> {}); } - /** - * Adds a settings consumer that accepts the values for three settings. The consumer is only notified if any one of - * the settings changed and if the provided validator succeeded. - *

- * Note: Only settings registered in {@link SettingsModule} can be changed dynamically. - *

- * This method registers a compound updater that is useful if three settings depends on each other. - * The consumer is always provided with both values even if only one of the two changes. - */ - public synchronized void addSettingsUpdateConsumer(Setting
a, Setting b, Setting c, TriConsumer consumer) { - if (a != get(a.getKey())) { - throw new IllegalArgumentException("Setting is not registered for key [" + a.getKey() + "]"); - } - if (b != get(b.getKey())) { - throw new IllegalArgumentException("Setting is not registered for key [" + b.getKey() + "]"); - } - if (c != get(c.getKey())) { - throw new IllegalArgumentException("Setting is not registered for key [" + c.getKey() + "]"); - } - addSettingsUpdater(Setting.compoundUpdater(consumer, (i, j, k) -> {}, a, b, c, logger)); - } - /** * Adds a settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change * and if the provided validator succeeded. diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index dd7c790ee05c6..139e09465f89f 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -231,7 +231,6 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, - BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index aaea633caaf60..176f24e3cc1ff 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -39,8 +39,6 @@ import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.Strings; -import org.opensearch.common.TriConsumer; -import org.opensearch.common.collect.Triplet; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; import org.opensearch.common.unit.ByteSizeValue; @@ -724,60 +722,6 @@ public String toString() { }; } - /** - * Updates settings that depend on each other. - * - * See {@link AbstractScopedSettings#addSettingsUpdateConsumer(Setting, Setting, Setting, TriConsumer)} and its usage for details. - */ - static AbstractScopedSettings.SettingUpdater> compoundUpdater( - final TriConsumer consumer, - final TriConsumer validator, - final Setting aSetting, - final Setting bSetting, - final Setting cSetting, - Logger logger - ) { - final AbstractScopedSettings.SettingUpdater aSettingUpdater = aSetting.newUpdater(null, logger); - final AbstractScopedSettings.SettingUpdater bSettingUpdater = bSetting.newUpdater(null, logger); - final AbstractScopedSettings.SettingUpdater cSettingUpdater = cSetting.newUpdater(null, logger); - return new AbstractScopedSettings.SettingUpdater>() { - @Override - public boolean hasChanged(Settings current, Settings previous) { - return aSettingUpdater.hasChanged(current, previous) - || bSettingUpdater.hasChanged(current, previous) - || cSettingUpdater.hasChanged(current, previous); - } - - @Override - public Triplet getValue(Settings current, Settings previous) { - A valueA = aSettingUpdater.getValue(current, previous); - B valueB = bSettingUpdater.getValue(current, previous); - C valueC = cSettingUpdater.getValue(current, previous); - validator.accept(valueA, valueB, valueC); - return new Triplet<>(valueA, valueB, valueC); - } - - @Override - public void apply(Triplet value, Settings current, Settings previous) { - if (aSettingUpdater.hasChanged(current, previous)) { - logSettingUpdate(aSetting, current, previous, logger); - } - if (bSettingUpdater.hasChanged(current, previous)) { - logSettingUpdate(bSetting, current, previous, logger); - } - if (cSettingUpdater.hasChanged(current, previous)) { - logSettingUpdate(cSetting, current, previous, logger); - } - consumer.accept(value.v1(), value.v2(), value.v3()); - } - - @Override - public String toString() { - return "CompoundUpdater for: " + aSettingUpdater + " and " + bSettingUpdater + " and " + cSettingUpdater; - } - }; - } - static AbstractScopedSettings.SettingUpdater groupedSettingsUpdater( Consumer consumer, final List> configuredSettings diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java b/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java index e1c6ac4150a5d..b0c828ca6b902 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/MultiTermsValuesSourceConfig.java @@ -57,7 +57,7 @@ ObjectParser apply( MultiTermsValuesSourceConfig.Builder::new ); - BaseMultiValuesSourceFieldConfig.PARSER.accept(parser, scriptable, timezoneAware); + BaseMultiValuesSourceFieldConfig.PARSER.apply(parser, scriptable, timezoneAware); if (valueTypeHinted) { parser.declareField( diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java b/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java index 3e2976a25f749..63fce83369c18 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/MultiValuesSourceFieldConfig.java @@ -69,7 +69,7 @@ public class MultiValuesSourceFieldConfig extends BaseMultiValuesSourceFieldConf MultiValuesSourceFieldConfig.Builder::new ); - BaseMultiValuesSourceFieldConfig.PARSER.accept(parser, scriptable, timezoneAware); + BaseMultiValuesSourceFieldConfig.PARSER.apply(parser, scriptable, timezoneAware); if (filtered) { parser.declareField( diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index 3762e137ac8da..1ba69694eaec1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -75,7 +75,7 @@ public class BalanceConfigurationTests extends OpenSearchAllocationTestCase { public void testIndexBalance() { /* Tests balance over indices only */ final float indexBalance = 1.0f; - final float shardBalance = 0.0f; + final float replicaBalance = 0.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -84,7 +84,7 @@ public void testIndexBalance() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); @@ -123,148 +123,10 @@ public void testIndexBalance() { ); } - /** - * This test verifies that with only primary shard balance, the primary shard distribution is balanced within thresholds. - */ - public void testPrimaryBalance() { - /* Tests balance over primary shards only */ - final float indexBalance = 0.0f; - final float shardBalance = 0.0f; - final float primaryBalance = 1.0f; - final float balanceThreshold = 1.0f; - - Settings.Builder settings = Settings.builder(); - settings.put( - ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), - ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() - ); - settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), primaryBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); - settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); - - AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); - - ClusterState clusterState = initCluster(strategy); - assertPrimaryBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - numberOfNodes, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - - clusterState = addNode(clusterState, strategy); - assertPrimaryBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - numberOfNodes + 1, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - - clusterState = removeNodes(clusterState, strategy); - assertPrimaryBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - (numberOfNodes + 1) - (numberOfNodes + 1) / 2, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - } - - /** - * This test verifies - */ - public void testBalanceDefaults() { - final float indexBalance = 0.55f; - final float shardBalance = 0.45f; - final float primaryBalance = 0.40f; - final float balanceThreshold = 1.0f; - - Settings.Builder settings = Settings.builder(); - settings.put( - ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), - ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() - ); - settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_FACTOR_SETTING.getKey(), primaryBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); - settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); - - AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); - - ClusterState clusterState = initCluster(strategy); - assertPrimaryBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - numberOfNodes, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - assertIndexBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - numberOfNodes, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - - clusterState = addNode(clusterState, strategy); - assertPrimaryBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - numberOfNodes + 1, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - assertIndexBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - numberOfNodes + 1, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - - clusterState = removeNodes(clusterState, strategy); - assertPrimaryBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - (numberOfNodes + 1) - (numberOfNodes + 1) / 2, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - assertIndexBalance( - clusterState.getRoutingTable(), - clusterState.getRoutingNodes(), - (numberOfNodes + 1) - (numberOfNodes + 1) / 2, - numberOfIndices, - numberOfReplicas, - numberOfShards, - balanceThreshold - ); - } - - public void testShardBalance() { + public void testReplicaBalance() { /* Tests balance over replicas only */ final float indexBalance = 0.0f; - final float shardBalance = 1.0f; + final float replicaBalance = 1.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -273,13 +135,13 @@ public void testShardBalance() { ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); - settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); ClusterState clusterState = initCluster(strategy); - assertShardBalance( + assertReplicaBalance( clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, @@ -289,7 +151,7 @@ public void testShardBalance() { ); clusterState = addNode(clusterState, strategy); - assertShardBalance( + assertReplicaBalance( clusterState.getRoutingNodes(), numberOfNodes + 1, numberOfIndices, @@ -299,7 +161,7 @@ public void testShardBalance() { ); clusterState = removeNodes(clusterState, strategy); - assertShardBalance( + assertReplicaBalance( clusterState.getRoutingNodes(), numberOfNodes + 1 - (numberOfNodes + 1) / 2, numberOfIndices, @@ -392,7 +254,7 @@ private ClusterState removeNodes(ClusterState clusterState, AllocationService st return applyStartedShardsUntilNoChange(clusterState, strategy); } - private void assertShardBalance( + private void assertReplicaBalance( RoutingNodes nodes, int numberOfNodes, int numberOfIndices, @@ -447,27 +309,6 @@ private void assertIndexBalance( } } - private void assertPrimaryBalance( - RoutingTable routingTable, - RoutingNodes nodes, - int numberOfNodes, - int numberOfIndices, - int numberOfReplicas, - int numberOfShards, - float threshold - ) { - - final int numShards = numberOfShards * numberOfIndices; - final float avgNumShards = (float) (numShards) / (float) (numberOfNodes); - final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - threshold))); - final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + threshold))); - - for (RoutingNode node : nodes) { - assertThat(node.primaryShardsWithState(STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards)); - assertThat(node.primaryShardsWithState(STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards)); - } - } - public void testPersistedSettings() { Settings.Builder settings = Settings.builder(); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), 0.2); diff --git a/server/src/test/java/org/opensearch/common/settings/SettingTests.java b/server/src/test/java/org/opensearch/common/settings/SettingTests.java index 45810b421c027..7703cb394397e 100644 --- a/server/src/test/java/org/opensearch/common/settings/SettingTests.java +++ b/server/src/test/java/org/opensearch/common/settings/SettingTests.java @@ -36,7 +36,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.LogEvent; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.collect.Triplet; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.AbstractScopedSettings.SettingUpdater; import org.opensearch.common.settings.Setting.Property; @@ -626,28 +625,6 @@ public void validate(Integer a, Integer b) { } } - // This test class is used to verify behavior of BalancedShardAllocator.WeightFunction and ensure set function is called - // whenever there is a change in any of the settings. - public static class TriSettingConsumer { - - private Integer b; - private Integer a; - - private Integer c; - - public void set(Integer a, Integer b, Integer c) { - this.a = a; - this.b = b; - this.c = c; - } - - public void validate(Integer a, Integer b, Integer c) { - if (Integer.signum(a) != Integer.signum(b) || Integer.signum(a) != Integer.signum(c)) { - throw new IllegalArgumentException("boom"); - } - } - } - public void testComposite() { Composite c = new Composite(); Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); @@ -712,110 +689,6 @@ public void testCompositeValidator() { } - public void testTriSettingConsumer() { - TriSettingConsumer consumer = new TriSettingConsumer(); - Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); - Setting b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope); - Setting c = Setting.intSetting("foo.int.bar.c", 1, Property.Dynamic, Property.NodeScope); - ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater( - consumer::set, - consumer::validate, - a, - b, - c, - logger - ); - assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); - assertNull(consumer.a); - assertNull(consumer.b); - assertNull(consumer.c); - - Settings build = Settings.builder().put("foo.int.bar.a", 2).build(); - assertTrue(settingUpdater.apply(build, Settings.EMPTY)); - assertEquals(2, consumer.a.intValue()); - assertEquals(1, consumer.b.intValue()); - assertEquals(1, consumer.c.intValue()); - - Integer aValue = consumer.a; - assertFalse(settingUpdater.apply(build, build)); - assertSame(aValue, consumer.a); - Settings previous = build; - build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build(); - assertTrue(settingUpdater.apply(build, previous)); - assertEquals(2, consumer.a.intValue()); - assertEquals(5, consumer.b.intValue()); - - Integer bValue = consumer.b; - assertFalse(settingUpdater.apply(build, build)); - assertSame(bValue, consumer.b); - previous = build; - build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).put("foo.int.bar.c", 10).build(); - assertTrue(settingUpdater.apply(build, previous)); - assertEquals(2, consumer.a.intValue()); - assertEquals(5, consumer.b.intValue()); - assertEquals(10, consumer.c.intValue()); - - // reset to default - assertTrue(settingUpdater.apply(Settings.EMPTY, build)); - assertEquals(1, consumer.a.intValue()); - assertEquals(1, consumer.b.intValue()); - assertEquals(1, consumer.c.intValue()); - } - - public void testTriSettingConsumerValidator() { - TriSettingConsumer consumer = new TriSettingConsumer(); - Setting a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope); - Setting b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope); - Setting c = Setting.intSetting("foo.int.bar.c", 1, Property.Dynamic, Property.NodeScope); - ClusterSettings.SettingUpdater> settingUpdater = Setting.compoundUpdater( - consumer::set, - consumer::validate, - a, - b, - c, - logger - ); - assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY)); - assertNull(consumer.a); - assertNull(consumer.b); - assertNull(consumer.c); - - Settings build = Settings.builder().put("foo.int.bar.a", 2).build(); - assertTrue(settingUpdater.apply(build, Settings.EMPTY)); - assertEquals(2, consumer.a.intValue()); - assertEquals(1, consumer.b.intValue()); - assertEquals(1, consumer.c.intValue()); - - Integer aValue = consumer.a; - assertFalse(settingUpdater.apply(build, build)); - assertSame(aValue, consumer.a); - final Settings previous = build; - build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build(); - assertTrue(settingUpdater.apply(build, previous)); - assertEquals(2, consumer.a.intValue()); - assertEquals(5, consumer.b.intValue()); - - Integer bValue = consumer.b; - assertFalse(settingUpdater.apply(build, build)); - assertSame(bValue, consumer.b); - final Settings previous2 = build; - build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).put("foo.int.bar.c", 10).build(); - assertTrue(settingUpdater.apply(build, previous)); - assertEquals(2, consumer.a.intValue()); - assertEquals(5, consumer.b.intValue()); - assertEquals(10, consumer.c.intValue()); - - Settings invalid = Settings.builder().put("foo.int.bar.a", -2).put("foo.int.bar.b", 5).build(); - IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(invalid, previous2)); - assertThat(exc.getMessage(), equalTo("boom")); - - // reset to default - assertTrue(settingUpdater.apply(Settings.EMPTY, build)); - assertEquals(1, consumer.a.intValue()); - assertEquals(1, consumer.b.intValue()); - assertEquals(1, consumer.c.intValue()); - } - public void testListSettingsDeprecated() { final Setting> deprecatedListSetting = Setting.listSetting( "foo.deprecated", diff --git a/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java index b38d4eee850ef..8c0087ca0b87d 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/metrics/SumAggregatorTests.java @@ -385,7 +385,7 @@ private void sumRandomDocsTestCase( builder, new MatchAllDocsQuery(), writer -> writer.addDocuments(docs), - internalSum -> verify.accept(finalSum, docs, internalSum), + internalSum -> verify.apply(finalSum, docs, internalSum), fieldType ); }