Skip to content

Commit

Permalink
Fix IndexRecoveryIT.testRecoverLocallyUpToGlobalCheckpoint (#75898)
Browse files Browse the repository at this point in the history
In certain scenarios the synced global checkpoint can fall behind
due to a slow disk. The test made the assumption that the global
checkpoint was stable when the global checkpoint was fetched,
this commit adds a new method that waits until the global checkpoint
is stable.

Closes #75451
  • Loading branch information
fcofdez authored Aug 2, 2021
1 parent b0f68ef commit 6099d68
Showing 1 changed file with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand All @@ -139,6 +140,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.oneOf;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -1318,6 +1320,8 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
connection.sendRequest(requestId, action, request, options);
});
}
assertGlobalCheckpointIsStableAndSyncedInAllNodes(indexName, nodes,0);

IndexShard shard = internalCluster().getInstance(IndicesService.class, failingNode)
.getShardOrNull(new ShardId(resolveIndex(indexName), 0));
final long lastSyncedGlobalCheckpoint = shard.getLastSyncedGlobalCheckpoint();
Expand Down Expand Up @@ -1667,9 +1671,9 @@ public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception {
internalCluster().startDataOnlyNode(randomNodeDataPathSettings);
ensureGreen();
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) {
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(NO_OPS_PERFORMED));
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(NO_OPS_PERFORMED));
}
}

Expand Down Expand Up @@ -1795,4 +1799,20 @@ public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception {
.mapToLong(n -> n.getIndices().getStore().getReservedSize().getBytes()).sum(), equalTo(0L));
}

private void assertGlobalCheckpointIsStableAndSyncedInAllNodes(String indexName, List<String> nodes, int shard) throws Exception {
assertThat(nodes, is(not(empty())));

ShardId shardId = new ShardId(resolveIndex(indexName), shard);
IndexShard indexShard = internalCluster().getInstance(IndicesService.class, nodes.get(0)).getShardOrNull(shardId);
assertThat(indexShard, is(notNullValue()));
long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();

for (String node : nodes) {
IndexShard nodeIndexShard = internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId);
assertThat(nodeIndexShard, is(notNullValue()));

assertThat(nodeIndexShard.seqNoStats().getMaxSeqNo(), is(equalTo(maxSeqNo)));
assertBusy(() -> assertThat(nodeIndexShard.getLastSyncedGlobalCheckpoint(), equalTo(maxSeqNo)));
}
}
}

0 comments on commit 6099d68

Please sign in to comment.