Skip to content

Commit

Permalink
Add more unit tests and minor code clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Aug 9, 2022
1 parent a62d02c commit ac47ffd
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS
NodeGatewayStartedShards::primary
).reversed();

private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShards::replicationCheckpoint
private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR = Comparator.nullsLast(
Comparator.comparing(NodeGatewayStartedShards::replicationCheckpoint)
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ protected NodesGatewayStartedShards newResponse(
protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
try {
final ShardId shardId = request.getShardId();
ReplicationCheckpoint replicationCheckpoint;
logger.trace("{} loading local shard state info", shardId);
ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState(
logger,
Expand Down Expand Up @@ -207,13 +206,12 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {

logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
IndexShard shard = indicesService.getShardOrNull(shardId);
replicationCheckpoint = shard != null ? shard.getLatestReplicationCheckpoint() : null;
final IndexShard shard = indicesService.getShardOrNull(shardId);
return new NodeGatewayStartedShards(
clusterService.localNode(),
allocationId,
shardStateMetadata.primary,
replicationCheckpoint
shard != null ? shard.getLatestReplicationCheckpoint() : null
);
}
logger.trace("{} no local shard info found", shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
this.testAllocator.enableSegmentReplication();
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2));
testAllocator.addData(node3, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand All @@ -241,6 +241,40 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}

/**
* Tests that replica with highest primary ter version will be selected as target
*/
public void testPreferReplicaWithNullReplicationCheckpoint() {
String allocId1 = randomAlphaOfLength(10);
String allocId2 = randomAlphaOfLength(10);
String allocId3 = randomAlphaOfLength(10);
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(
yesAllocationDeciders(),
CLUSTER_RECOVERED,
allocId1,
allocId2,
allocId3
);
this.testAllocator.enableSegmentReplication();
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node2, allocId2, false);
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
equalTo(node3.getId())
);
// Assert node3's allocation id should be used as it has highest replication checkpoint
assertThat(
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(),
equalTo(allocId3)
);
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}

/**
* Tests that replica with highest segment info version will be selected as target on equal primary terms
*/
Expand All @@ -258,7 +292,7 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
this.testAllocator.enableSegmentReplication();
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3));
testAllocator.addData(node3, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand All @@ -276,7 +310,7 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
}

/**
* Tests that prefer allocation of older primary even though having lower replication checkpoint
* Tests that prefer allocation of replica at lower checkpoint but in sync set
*/
public void testOutOfSyncHighestRepCheckpointIsIgnored() {
String allocId1 = randomAlphaOfLength(10);
Expand All @@ -300,7 +334,7 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
equalTo(node3.getId())
);
// Assert node2's allocation id is used with highest replication checkpoint
// Assert node3's allocation id is used
assertThat(
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(),
equalTo(allocId3)
Expand All @@ -309,7 +343,7 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
}

/**
* Tests that prefer allocation of older primary even though having lower replication checkpoint
* Tests that prefer allocation of older primary over replica with higher replication checkpoint
*/
public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
String allocId1 = randomAlphaOfLength(10);
Expand All @@ -334,7 +368,7 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
equalTo(node1.getId())
);
// Assert node2's allocation id is used with highest replication checkpoint
// Assert node1's allocation id is used with highest replication checkpoint
assertThat(
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(),
equalTo(allocId1)
Expand Down

0 comments on commit ac47ffd

Please sign in to comment.