Skip to content

Commit

Permalink
Segment Replication - Fix incorrect maxSeqNo computation.
Browse files Browse the repository at this point in the history
This change updates getLatestSegmentInfos to only return the max seqNo from the previous
commit point.  This is the only way to guarantee that up to this seqNo has made it into the commit point.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Mar 9, 2023
1 parent 9a763e8 commit b0c674c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

import static java.util.Arrays.asList;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -422,6 +422,68 @@ public void testDeleteOperations() throws Exception {
}
}

/**
* This tests that the max seqNo we send to replicas is accurate and that after failover
* the new primary starts indexing from the correct maxSeqNo and replays the correct count of docs
* from xlog.
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(0, 200);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, primary, replica);

final int deletedDocCount = randomIntBetween(0, initialDocCount / 2);
for (int i = 0; i < deletedDocCount; i++) {
client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
}
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();

// randomly flush here after the force merge to wipe any old segments.
if (randomBoolean()) {
flush(INDEX_NAME);
}

final IndexShard primaryShard = getIndexShard(primary, INDEX_NAME);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
assertBusy(
() -> assertEquals(
primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);

// add some docs to the xlog and drop primary.
final int additionalDocs = randomIntBetween(1, 50);
for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
// Drop the primary and wait until replica is promoted.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
ensureYellowAndNoInitializingShards(INDEX_NAME);

final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica);
assertNotNull(replicaShardRouting);
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
refresh(INDEX_NAME);
final long expectedHitCount = initialDocCount + additionalDocs - deletedDocCount;
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1;
assertEquals(expectedMaxSeqNo, replicaShard.getLatestReplicationCheckpoint().getSeqNo());

// index another doc.
client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get();
refresh(INDEX_NAME);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1);
}

public void testUpdateOperations() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
Expand Down
27 changes: 11 additions & 16 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1481,22 +1481,17 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> {
try {
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
)
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
// fetch local cp from the previous commit, current local checkpoint may be ahead already.
Long.parseLong(segmentInfos.userData.get(MAX_SEQ_NO)),
segmentInfos.getVersion()
)
)).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}

/**
Expand Down

0 comments on commit b0c674c

Please sign in to comment.