Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize primary shard movement during shard relocation #1445

Merged
merged 12 commits into from
Jan 20, 2022
Merged
129 changes: 102 additions & 27 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,63 +48,140 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
* that are hosted on that nodes. Each {@link RoutingNode} has a unique node id that can be used to identify the node.
*/
public class RoutingNode implements Iterable<ShardRouting> {

static class BucketedShards implements Iterable<ShardRouting> {
private static Map<Boolean, Integer> map = new HashMap<Boolean, Integer>() {
{
put(true, 0);
put(false, 1);
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the only use case for indexing into the shards array (0 or 1), will enum be better?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something here, but enum will define new type which is different than returned from shardRouting.primary() boolean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right enum wont work here. Can leave it as is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe

org.opensearch.common.collect.Map.of(true, 0, false, 1)


private final LinkedHashMap<ShardId, ShardRouting>[] shards; // LinkedHashMap to preserve order

BucketedShards(LinkedHashMap<ShardId, ShardRouting> primaryShards, LinkedHashMap<ShardId, ShardRouting> replicaShards) {
this.shards = new LinkedHashMap[2];
this.shards[0] = primaryShards;
this.shards[1] = replicaShards;
}
jainankitk marked this conversation as resolved.
Show resolved Hide resolved

public boolean isEmpty() {
return this.shards[0].isEmpty() && this.shards[1].isEmpty();
}

public int size() {
return this.shards[0].size() + this.shards[1].size();
}

public boolean containsKey(ShardId shardId) {
return this.shards[0].containsKey(shardId) || this.shards[1].containsKey(shardId);
}

public ShardRouting get(ShardId shardId) {
if (this.shards[0].containsKey(shardId)) {
return this.shards[0].get(shardId);
}
return this.shards[1].get(shardId);
}

public ShardRouting add(ShardRouting shardRouting) {
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
return put(shardRouting.shardId(), shardRouting);
}

public ShardRouting put(ShardId shardId, ShardRouting shardRouting) {
ShardRouting ret = this.shards[map.get(shardRouting.primary())].put(shardId, shardRouting);
if (this.shards[map.get(!shardRouting.primary())].containsKey(shardId)) {
return this.shards[map.get(!shardRouting.primary())].remove(shardId);
}

return ret;
}

public ShardRouting remove(ShardId shardId) {
if (this.shards[0].containsKey(shardId)) {
return this.shards[0].remove(shardId);
}
return this.shards[1].remove(shardId);
}

@Override
public Iterator<ShardRouting> iterator() {
final Iterator<ShardRouting> iterator1 = Collections.unmodifiableCollection(shards[0].values()).iterator();
final Iterator<ShardRouting> iterator2 = Collections.unmodifiableCollection(shards[1].values()).iterator();
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
return iterator1.hasNext() || iterator2.hasNext();
}

@Override
public ShardRouting next() {
if (iterator1.hasNext()) {
return iterator1.next();
}
return iterator2.next();
}
};
}
}

private final String nodeId;

private final DiscoveryNode node;

private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order
private final BucketedShards shards;

private final LinkedHashSet<ShardRouting> initializingShards;

private final LinkedHashSet<ShardRouting> relocatingShards;

private final HashMap<Index, LinkedHashSet<ShardRouting>> shardsByIndex;

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}

RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap<ShardId, ShardRouting> shards) {
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRoutings) {
this.nodeId = nodeId;
this.node = node;
this.shards = shards;
final LinkedHashMap<ShardId, ShardRouting> primaryShards = new LinkedHashMap<>();
final LinkedHashMap<ShardId, ShardRouting> replicaShards = new LinkedHashMap<>();
this.shards = new BucketedShards(primaryShards, replicaShards);
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
this.shardsByIndex = new LinkedHashMap<>();
for (ShardRouting shardRouting : shards.values()) {

for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.initializing()) {
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
}
shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
}
assert invariant();
}

private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
final LinkedHashMap<ShardId, ShardRouting> shards = new LinkedHashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardRouting previousValue = shards.put(shardRouting.shardId(), shardRouting);
ShardRouting previousValue;
if (shardRouting.primary()) {
previousValue = primaryShards.put(shardRouting.shardId(), shardRouting);
} else {
previousValue = replicaShards.put(shardRouting.shardId(), shardRouting);
}

if (previousValue != null) {
throw new IllegalArgumentException(
"Cannot have two different shards with same shard id " + shardRouting.shardId() + " on same node "
);
}
}
return shards;

assert invariant();
}

@Override
public Iterator<ShardRouting> iterator() {
return Collections.unmodifiableCollection(shards.values()).iterator();
return shards.iterator();
}

/**
Expand Down Expand Up @@ -139,7 +216,7 @@ public int size() {
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.containsKey(shard.shardId())) {
if (shards.add(shard) != null) {
throw new IllegalStateException(
"Trying to add a shard "
+ shard.shardId()
Expand All @@ -152,7 +229,6 @@ void add(ShardRouting shard) {
+ "]"
);
}
shards.put(shard.shardId(), shard);

if (shard.initializing()) {
initializingShards.add(shard);
Expand Down Expand Up @@ -322,7 +398,7 @@ public int numberOfOwningShardsForIndex(final Index index) {
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n");
for (ShardRouting entry : shards.values()) {
for (ShardRouting entry : shards) {
sb.append("--------").append(entry.shortSummary()).append('\n');
}
return sb.toString();
Expand All @@ -345,7 +421,9 @@ public String toString() {
}

public List<ShardRouting> copyShards() {
return new ArrayList<>(shards.values());
List<ShardRouting> result = new ArrayList<>();
shards.forEach(result::add);
return result;
}

public boolean isEmpty() {
Expand All @@ -355,23 +433,20 @@ public boolean isEmpty() {
private boolean invariant() {

// initializingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsInitializing = shards.values()
.stream()
Collection<ShardRouting> shardRoutingsInitializing = StreamSupport.stream(shards.spliterator(), false)
.filter(ShardRouting::initializing)
.collect(Collectors.toList());
assert initializingShards.size() == shardRoutingsInitializing.size();
assert initializingShards.containsAll(shardRoutingsInitializing);

// relocatingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsRelocating = shards.values()
.stream()
Collection<ShardRouting> shardRoutingsRelocating = StreamSupport.stream(shards.spliterator(), false)
.filter(ShardRouting::relocating)
.collect(Collectors.toList());
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);

final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = shards.values()
.stream()
final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = StreamSupport.stream(shards.spliterator(), false)
.collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
assert shardRoutingsByIndex.equals(shardsByIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -108,10 +107,10 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();

Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>();
// fill in the nodeToShards with the "live" nodes
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
nodesToShards.put(cursor.value.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order
String nodeId = cursor.value.getId();
this.nodesToShards.put(cursor.value.getId(), new RoutingNode(nodeId, clusterState.nodes().get(nodeId)));
}

// fill in the inverse of node -> shards allocated
Expand All @@ -125,27 +124,23 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
// by the ShardId, as this is common for primary and replicas.
// A replica Set might have one (and not more) replicas with the state of RELOCATING.
if (shard.assignedToNode()) {
Map<ShardId, ShardRouting> entries = nodesToShards.computeIfAbsent(
RoutingNode routingNode = this.nodesToShards.computeIfAbsent(
shard.currentNodeId(),
k -> new LinkedHashMap<>()
); // LinkedHashMap to preserve order
ShardRouting previousValue = entries.put(shard.shardId(), shard);
if (previousValue != null) {
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
}
k -> new RoutingNode(shard.currentNodeId(), clusterState.nodes().get(shard.currentNodeId()))
);
routingNode.add(shard);
assignedShardsAdd(shard);
if (shard.relocating()) {
relocatingShards++;
// LinkedHashMap to preserve order.
// Add the counterpart shard with relocatingNodeId reflecting the source from which
// it's relocating from.
entries = nodesToShards.computeIfAbsent(shard.relocatingNodeId(), k -> new LinkedHashMap<>());
routingNode = nodesToShards.computeIfAbsent(
shard.relocatingNodeId(),
k -> new RoutingNode(shard.relocatingNodeId(), clusterState.nodes().get(shard.relocatingNodeId()))
);
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
addInitialRecovery(targetShardRouting, indexShard.primary);
previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting);
if (previousValue != null) {
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
}
routingNode.add(targetShardRouting);
assignedShardsAdd(targetShardRouting);
} else if (shard.initializing()) {
if (shard.primary()) {
Expand All @@ -160,10 +155,6 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
}
}
}
for (Map.Entry<String, LinkedHashMap<ShardId, ShardRouting>> entry : nodesToShards.entrySet()) {
String nodeId = entry.getKey();
this.nodesToShards.put(nodeId, new RoutingNode(nodeId, clusterState.nodes().get(nodeId), entry.getValue()));
}
}

private void addRecovery(ShardRouting routing) {
Expand Down Expand Up @@ -1296,24 +1287,51 @@ public Iterator<ShardRouting> nodeInterleavedShardIterator() {
queue.add(entry.getValue().copyShards().iterator());
}
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return true;
}
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;
}
replicaIterators.poll();
}
return false;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
Iterator<ShardRouting> iter = queue.poll();
ShardRouting result = iter.next();
queue.offer(iter);
return result;
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
}
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);
return replicaShard;
}

public void remove() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.test.OpenSearchTestCase;

import java.net.InetAddress;
import java.util.Iterator;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -86,6 +87,32 @@ public void testAdd() {
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetadata.INDEX_UUID_NA_VALUE, 4)), equalTo(relocatingShard0));
}

public void testPrimaryFirstIterator() {
ShardRouting initializingShard3 = TestShardRouting.newShardRouting("test", 3, "node-1", false, ShardRoutingState.INITIALIZING);
ShardRouting relocatingShard4 = TestShardRouting.newShardRouting(
"test",
4,
"node-1",
"node-2",
true,
ShardRoutingState.RELOCATING
);
routingNode.add(initializingShard3);
routingNode.add(relocatingShard4);
final Iterator<ShardRouting> shardRoutingIterator = routingNode.iterator();
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(relocatingShard4));
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(unassignedShard0));
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(initializingShard0));
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(relocatingShard0));
assertTrue(shardRoutingIterator.hasNext());
assertThat(shardRoutingIterator.next(), equalTo(initializingShard3));
assertFalse(shardRoutingIterator.hasNext());
}

public void testUpdate() {
ShardRouting startedShard0 = TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED);
ShardRouting startedShard1 = TestShardRouting.newShardRouting("test", 1, "node-1", "node-2", false, ShardRoutingState.RELOCATING);
Expand Down