Skip to content

Commit

Permalink
Stabilizing org.opensearch.cluster.routing.MovePrimaryFirstTests.test… (
Browse files Browse the repository at this point in the history
opensearch-project#2048)

* Stabilizing org.opensearch.cluster.routing.MovePrimaryFirstTests.testClusterGreenAfterPartialRelocation

Signed-off-by: Ankit Jain <[email protected]>

* Removing unused import

Signed-off-by: Ankit Jain <[email protected]>

* Making code more readable

Signed-off-by: Ankit Jain <[email protected]>
(cherry picked from commit 343b82f)
  • Loading branch information
jainankitk committed Feb 10, 2022
1 parent a9ad1d9 commit 66abfcb
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
Expand Down Expand Up @@ -84,7 +85,7 @@ public ShardRouting get(ShardId shardId) {
return this.shardTuple.v2().get(shardId);
}

public ShardRouting add(ShardRouting shardRouting) {
public ShardRouting put(ShardRouting shardRouting) {
return put(shardRouting.shardId(), shardRouting);
}

Expand Down Expand Up @@ -114,22 +115,10 @@ public ShardRouting remove(ShardId shardId) {

@Override
public Iterator<ShardRouting> iterator() {
final Iterator<ShardRouting> primaryIterator = Collections.unmodifiableCollection(this.shardTuple.v1().values()).iterator();
final Iterator<ShardRouting> replicaIterator = Collections.unmodifiableCollection(this.shardTuple.v2().values()).iterator();
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
return primaryIterator.hasNext() || replicaIterator.hasNext();
}

@Override
public ShardRouting next() {
if (primaryIterator.hasNext()) {
return primaryIterator.next();
}
return replicaIterator.next();
}
};
return Stream.concat(
Collections.unmodifiableCollection(this.shardTuple.v1().values()).stream(),
Collections.unmodifiableCollection(this.shardTuple.v2().values()).stream()
).iterator();
}
}

Expand Down Expand Up @@ -217,7 +206,7 @@ public int size() {
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.add(shard) != null) {
if (shards.put(shard) != null) {
throw new IllegalStateException(
"Trying to add a shard "
+ shard.shardId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand Down Expand Up @@ -83,19 +86,25 @@ public void testClusterGreenAfterPartialRelocation() throws InterruptedException
final ClusterStateListener listener = event -> {
if (event.routingTableChanged()) {
final RoutingNodes routingNodes = event.state().getRoutingNodes();
int startedz2n1 = 0;
int startedz2n2 = 0;
int startedCount = 0;
List<ShardRouting> initz2n1 = new ArrayList<>(), initz2n2 = new ArrayList<>();
for (Iterator<RoutingNode> it = routingNodes.iterator(); it.hasNext();) {
RoutingNode routingNode = it.next();
final String nodeName = routingNode.node().getName();
if (nodeName.equals(z2n1)) {
startedz2n1 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
startedCount += routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
initz2n1 = routingNode.shardsWithState(ShardRoutingState.INITIALIZING);
} else if (nodeName.equals(z2n2)) {
startedz2n2 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
startedCount += routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
initz2n2 = routingNode.shardsWithState(ShardRoutingState.INITIALIZING);
}
}
if (startedz2n1 >= primaryShardCount / 2 && startedz2n2 >= primaryShardCount / 2) {
primaryMoveLatch.countDown();
if (!Stream.concat(initz2n1.stream(), initz2n2.stream()).anyMatch(s -> s.primary())) {
// All primaries are relocated before 60% of total shards are started on new nodes
final int totalShardCount = primaryShardCount * 2;
if (primaryShardCount <= startedCount && startedCount <= 3 * totalShardCount / 5) {
primaryMoveLatch.countDown();
}
}
}
};
Expand Down

0 comments on commit 66abfcb

Please sign in to comment.