Skip to content

Commit

Permalink
Speedup BalanceUnbalancedClusterTests (#88794)
Browse files Browse the repository at this point in the history
This commit speeds up the above tests (and probably many others) by changing how
 we assert the invariant. Previously invariant was checked by rebuilding
 internal collections from scratch and comparing them against ones already
 present in the object after every single modification twice. This commit
 verifies the invariant once after all bulk changes.
  • Loading branch information
idegtiarenko authored Jul 28, 2022
1 parent 664c12c commit c755e7b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -26,7 +25,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
Expand Down Expand Up @@ -118,7 +116,14 @@ public int size() {
* @param shard Shard to create on this Node
*/
void add(ShardRouting shard) {
assert invariant();
addInternal(shard, true);
}

void addWithoutValidation(ShardRouting shard) {
addInternal(shard, false);
}

private void addInternal(ShardRouting shard, boolean validate) {
final ShardRouting existing = shards.putIfAbsent(shard.shardId(), shard);
if (existing != null) {
final IllegalStateException e = new IllegalStateException(
Expand All @@ -142,11 +147,10 @@ void add(ShardRouting shard) {
relocatingShards.add(shard);
}
shardsByIndex.computeIfAbsent(shard.index(), k -> new HashSet<>()).add(shard);
assert invariant();
assert validate == false || invariant();
}

void update(ShardRouting oldShard, ShardRouting newShard) {
assert invariant();
if (shards.containsKey(oldShard.shardId()) == false) {
// Shard was already removed by routing nodes iterator
// TODO: change caller logic in RoutingNodes so that this check can go away
Expand Down Expand Up @@ -174,7 +178,6 @@ void update(ShardRouting oldShard, ShardRouting newShard) {
}

void remove(ShardRouting shard) {
assert invariant();
ShardRouting previousValue = shards.remove(shard.shardId());
assert previousValue == shard : "expected shard " + previousValue + " but was " + shard;
if (shard.initializing()) {
Expand Down Expand Up @@ -342,20 +345,24 @@ public boolean isEmpty() {
return shards.isEmpty();
}

private boolean invariant() {
// initializingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsInitializing = shards.values().stream().filter(ShardRouting::initializing).toList();
assert initializingShards.size() == shardRoutingsInitializing.size();
assert initializingShards.containsAll(shardRoutingsInitializing);
boolean invariant() {
var shardRoutingsInitializing = new ArrayList<ShardRouting>(shards.size());
var shardRoutingsRelocating = new ArrayList<ShardRouting>(shards.size());
// this guess assumes 1 shard per index, this is not precise, but okay for assertion
var shardRoutingsByIndex = Maps.<Index, Set<ShardRouting>>newHashMapWithExpectedSize(shards.size());

// relocatingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsRelocating = shards.values().stream().filter(ShardRouting::relocating).toList();
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);
for (var shard : shards.values()) {
if (shard.initializing()) {
shardRoutingsInitializing.add(shard);
}
if (shard.relocating()) {
shardRoutingsRelocating.add(shard);
}
shardRoutingsByIndex.computeIfAbsent(shard.index(), k -> new HashSet<>(10)).add(shard);
}

final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = shards.values()
.stream()
.collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
assert initializingShards.size() == shardRoutingsInitializing.size() && initializingShards.containsAll(shardRoutingsInitializing);
assert relocatingShards.size() == shardRoutingsRelocating.size() && relocatingShards.containsAll(shardRoutingsRelocating);
assert shardRoutingsByIndex.equals(shardsByIndex);

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,16 @@ private RoutingNodes(RoutingTable routingTable, DiscoveryNodes discoveryNodes, b
// A replica Set might have one (and not more) replicas with the state of RELOCATING.
if (shard.assignedToNode()) {
// LinkedHashMap to preserve order
nodesToShards.computeIfAbsent(shard.currentNodeId(), createRoutingNode).add(shard);
nodesToShards.computeIfAbsent(shard.currentNodeId(), createRoutingNode).addWithoutValidation(shard);
assignedShardsAdd(shard);
if (shard.relocating()) {
relocatingShards++;
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
addInitialRecovery(targetShardRouting, indexShard.primary);
// LinkedHashMap to preserve order.
// Add the counterpart shard with relocatingNodeId reflecting the source from which it's relocating from.
nodesToShards.computeIfAbsent(shard.relocatingNodeId(), createRoutingNode).add(targetShardRouting);
nodesToShards.computeIfAbsent(shard.relocatingNodeId(), createRoutingNode)
.addWithoutValidation(targetShardRouting);
assignedShardsAdd(targetShardRouting);
} else if (shard.initializing()) {
if (shard.primary()) {
Expand All @@ -145,6 +146,7 @@ private RoutingNodes(RoutingTable routingTable, DiscoveryNodes discoveryNodes, b
}
}
}
nodesToShards.values().forEach(RoutingNode::invariant);
}

private RoutingNodes(RoutingNodes routingNodes) {
Expand Down

0 comments on commit c755e7b

Please sign in to comment.