diff --git a/CHANGELOG.md b/CHANGELOG.md index 6826ef2d1f58e..744ee03c1f5e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644)) - Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763)) - Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606)) +- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335)) ### Security diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index a05938c176678..7999faece52ca 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -247,11 +247,17 @@ void balance() { final Map nodePrimaryShardCount = calculateNodePrimaryShardCount(remoteRoutingNodes); int totalPrimaryShardCount = nodePrimaryShardCount.values().stream().reduce(0, Integer::sum); - totalPrimaryShardCount += routingNodes.unassigned().getNumPrimaries(); - int avgPrimaryPerNode = (totalPrimaryShardCount + routingNodes.size() - 1) / routingNodes.size(); + int unassignedRemotePrimaryShardCount = 0; + for (ShardRouting shard : routingNodes.unassigned()) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) && shard.primary()) { + unassignedRemotePrimaryShardCount++; + } + } + totalPrimaryShardCount += unassignedRemotePrimaryShardCount; + final int avgPrimaryPerNode = (totalPrimaryShardCount + remoteRoutingNodes.size() - 1) / remoteRoutingNodes.size(); - ArrayDeque sourceNodes = new ArrayDeque<>(); - ArrayDeque targetNodes = new ArrayDeque<>(); + final ArrayDeque sourceNodes = new ArrayDeque<>(); + final ArrayDeque targetNodes = new ArrayDeque<>(); for (RoutingNode node : remoteRoutingNodes) { if (nodePrimaryShardCount.get(node.nodeId()) > avgPrimaryPerNode) { sourceNodes.add(node); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index 6a03a1f79bcde..a7f18aabf8436 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -194,7 +194,7 @@ public AllocationService createRemoteCapableAllocationService() { } public AllocationService createRemoteCapableAllocationService(String excludeNodes) { - Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.node_id", excludeNodes).build(); + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude._id", excludeNodes).build(); return new MockAllocationService( randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), new TestGatewayAllocator(), diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java index e1c0a7eff1f6e..e55a9de160114 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java @@ -25,25 +25,51 @@ public class RemoteShardsRebalanceShardsTests extends RemoteShardsBalancerBaseTe * Post rebalance primaries should be balanced across all the nodes. */ public void testShardAllocationAndRebalance() { - int localOnlyNodes = 20; - int remoteCapableNodes = 40; - int localIndices = 40; - int remoteIndices = 80; + final int localOnlyNodes = 20; + final int remoteCapableNodes = 40; + final int halfRemoteCapableNodes = remoteCapableNodes / 2; + final int localIndices = 40; + final int remoteIndices = 80; ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); - AllocationService service = this.createRemoteCapableAllocationService(); + final StringBuilder excludeNodes = new StringBuilder(); + for (int i = 0; i < halfRemoteCapableNodes; i++) { + excludeNodes.append(getNodeId(i, true)); + if (i != (remoteCapableNodes / 2 - 1)) { + excludeNodes.append(", "); + } + } + AllocationService service = this.createRemoteCapableAllocationService(excludeNodes.toString()); clusterState = allocateShardsAndBalance(clusterState, service); RoutingNodes routingNodes = clusterState.getRoutingNodes(); RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); - final Map nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true); - final Map nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false); + Map nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true); + Map nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false); int avgPrimariesPerNode = getTotalShardCountAcrossNodes(nodePrimariesCounter) / remoteCapableNodes; - // Primary and replica are balanced post first reroute + // Primary and replica are balanced after first allocating unassigned + for (RoutingNode node : routingNodes) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) { + if (Integer.parseInt(node.nodeId().split("-")[4]) < halfRemoteCapableNodes) { + assertEquals(0, (int) nodePrimariesCounter.getOrDefault(node.nodeId(), 0)); + } else { + assertEquals(avgPrimariesPerNode * 2, (int) nodePrimariesCounter.get(node.nodeId())); + } + assertTrue(nodeReplicaCounter.getOrDefault(node.nodeId(), 0) >= 0); + } + } + + // Remove exclude constraint and rebalance + service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + routingNodes = clusterState.getRoutingNodes(); + allocation = getRoutingAllocation(clusterState, routingNodes); + nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true); + nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false); for (RoutingNode node : routingNodes) { if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) { - assertInRange(nodePrimariesCounter.get(node.nodeId()), avgPrimariesPerNode, remoteCapableNodes - 1); - assertTrue(nodeReplicaCounter.get(node.nodeId()) >= 0); + assertEquals(avgPrimariesPerNode, (int) nodePrimariesCounter.get(node.nodeId())); + assertTrue(nodeReplicaCounter.getOrDefault(node.nodeId(), 0) >= 0); } } }