Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilizing org.opensearch.cluster.routing.MovePrimaryFirstTests.test… #2048

Merged
merged 3 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
Expand Down Expand Up @@ -84,7 +85,7 @@ public ShardRouting get(ShardId shardId) {
return this.shardTuple.v2().get(shardId);
}

public ShardRouting add(ShardRouting shardRouting) {
public ShardRouting put(ShardRouting shardRouting) {
return put(shardRouting.shardId(), shardRouting);
}

Expand Down Expand Up @@ -114,22 +115,10 @@ public ShardRouting remove(ShardId shardId) {

@Override
public Iterator<ShardRouting> iterator() {
final Iterator<ShardRouting> primaryIterator = Collections.unmodifiableCollection(this.shardTuple.v1().values()).iterator();
final Iterator<ShardRouting> replicaIterator = Collections.unmodifiableCollection(this.shardTuple.v2().values()).iterator();
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
return primaryIterator.hasNext() || replicaIterator.hasNext();
}

@Override
public ShardRouting next() {
if (primaryIterator.hasNext()) {
return primaryIterator.next();
}
return replicaIterator.next();
}
};
return Stream.concat(
Collections.unmodifiableCollection(this.shardTuple.v1().values()).stream(),
Collections.unmodifiableCollection(this.shardTuple.v2().values()).stream()
).iterator();
}
}

Expand Down Expand Up @@ -217,7 +206,7 @@ public int size() {
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.add(shard) != null) {
if (shards.put(shard) != null) {
throw new IllegalStateException(
"Trying to add a shard "
+ shard.shardId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand Down Expand Up @@ -84,19 +86,24 @@ public void testClusterGreenAfterPartialRelocation() throws InterruptedException
final ClusterStateListener listener = event -> {
if (event.routingTableChanged()) {
final RoutingNodes routingNodes = event.state().getRoutingNodes();
int startedz2n1 = 0;
int startedz2n2 = 0;
int startedCount = 0;
List<ShardRouting> initz2n1 = new ArrayList<>(), initz2n2 = new ArrayList<>();
for (Iterator<RoutingNode> it = routingNodes.iterator(); it.hasNext();) {
RoutingNode routingNode = it.next();
final String nodeName = routingNode.node().getName();
if (nodeName.equals(z2n1)) {
startedz2n1 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
startedCount += routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
initz2n1 = routingNode.shardsWithState(ShardRoutingState.INITIALIZING);
} else if (nodeName.equals(z2n2)) {
startedz2n2 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
startedCount += routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
initz2n2 = routingNode.shardsWithState(ShardRoutingState.INITIALIZING);
}
}
if (startedz2n1 >= primaryShardCount / 2 && startedz2n2 >= primaryShardCount / 2) {
primaryMoveLatch.countDown();
if (!Stream.concat(initz2n1.stream(), initz2n2.stream()).anyMatch(s -> s.primary())) {
// All primaries are relocated before 60% of overall shards are started on new nodes
if (primaryShardCount <= startedCount && startedCount <= 6 * primaryShardCount / 5) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's obvious, but it is not immediately clear to me why the 6 * primaryShardCount / 5 math is correct for calculating that 60% of shards are started on new nodes. Can you explain how this works?

Copy link
Collaborator Author

@jainankitk jainankitk Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Total number of shards are double the primary shard count (1 replica) - 2 * primaryShardCount. Hence, 60% of total shards is 3 * total number of shards / 5 which is same as 6 * primaryShardCount / 5

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I suggest creating an intermediate variable just to make this more readable, like:

final int totalShardCount = primaryShardCount * 2;
if (primaryShardCount <= startedCount && startedCount <= totalShardCount * 3 / 5) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense

primaryMoveLatch.countDown();
}
}
}
};
Expand All @@ -113,6 +120,6 @@ public void testClusterGreenAfterPartialRelocation() throws InterruptedException
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n1));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to shutdown nodes z2n1 and z2n2 as well here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 4 nodes in the cluster. If we shutdown all 4, cluster will not be green. We want to shutdown all excluded nodes (in this case 2) after 60% of total shards have relocated to z2n1 and z2n2. Due to [#1445 ] all primaries would have started in those 60% and hence, cluster will become eventually green

} catch (Exception e) {}
ensureGreen(TimeValue.timeValueSeconds(60));
ensureGreen();
}
}