diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 9975a5ff65a34..7b40824644ed8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -32,8 +32,8 @@ import static java.util.Arrays.asList; import static org.opensearch.index.query.QueryBuilders.matchQuery; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -423,6 +423,68 @@ public void testDeleteOperations() throws Exception { } } + /** + * This tests that the max seqNo we send to replicas is accurate and that after failover + * the new primary starts indexing from the correct maxSeqNo and replays the correct count of docs + * from xlog. + */ + public void testReplicationPostDeleteAndForceMerge() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(10, 200); + for (int i = 0; i < initialDocCount; i++) { + client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(initialDocCount, primary, replica); + + final int deletedDocCount = randomIntBetween(10, initialDocCount); + for (int i = 0; i < deletedDocCount; i++) { + client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + } + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + + // randomly flush here after the force merge to wipe any old segments. + if (randomBoolean()) { + flush(INDEX_NAME); + } + + final IndexShard primaryShard = getIndexShard(primary, INDEX_NAME); + final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); + assertBusy( + () -> assertEquals( + primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ) + ); + + // add some docs to the xlog and drop primary. + final int additionalDocs = randomIntBetween(1, 50); + for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) { + client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); + } + // Drop the primary and wait until replica is promoted. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + + final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); + assertNotNull(replicaShardRouting); + assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + refresh(INDEX_NAME); + final long expectedHitCount = initialDocCount + additionalDocs - deletedDocCount; + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1; + assertEquals(expectedMaxSeqNo, replicaShard.seqNoStats().getMaxSeqNo()); + + // index another doc. + client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get(); + refresh(INDEX_NAME); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1); + } + public void testUpdateOperations() throws Exception { internalCluster().startClusterManagerOnlyNode(); final String primary = internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 76461783fb724..cb5a7fbc3f181 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -42,6 +42,8 @@ import java.util.function.BiFunction; import java.util.stream.Stream; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; + /** * This is an {@link Engine} implementation intended for replica shards when Segment Replication * is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager} @@ -123,10 +125,11 @@ public TranslogManager translogManager() { return translogManager; } - public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { + public synchronized void updateSegments(final SegmentInfos infos) throws IOException { // Update the current infos reference on the Engine's reader. ensureOpen(); try (ReleasableLock lock = writeLock.acquire()) { + final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); @@ -134,11 +137,11 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th // lower/higher gens are possible from a new primary that was just elected. if (incomingGeneration != lastReceivedGen) { commitSegmentInfos(); - translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); + translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index dd180b95e6b96..9999b56f189c2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1425,7 +1425,7 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { if (getReplicationEngine().isPresent()) { - getReplicationEngine().get().updateSegments(infos, seqNo); + getReplicationEngine().get().updateSegments(infos); } } @@ -1478,22 +1478,15 @@ public Tuple, ReplicationCheckpoint> getLatestSegme } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); - return Optional.ofNullable(snapshot.get()).map(segmentInfos -> { - try { - return new Tuple<>( - snapshot, - new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - segmentInfos.getGeneration(), - shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(), - segmentInfos.getVersion() - ) - ); - } catch (IOException e) { - throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); - } - }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + return Optional.ofNullable(snapshot.get()).map(segmentInfos -> new Tuple<>( + snapshot, + new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + segmentInfos.getVersion() + ) + )).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 6f46fe8398388..1e36e7c155b02 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -220,7 +220,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, responseCheckpoint.getSegmentsGen() ); cancellableThreads.checkForCancel(); - indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); + indexShard.finalizeReplication(infos); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 6a4e5e449f178..c03473e0a8c30 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -28,7 +28,6 @@ public class ReplicationCheckpoint implements Writeable, Comparable() { @Override @@ -322,7 +320,7 @@ public void getSegmentFiles( ); segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); - doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + doThrow(exception).when(spyIndexShard).finalizeReplication(any()); segrepTarget.startReplication(new ActionListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 704b40b05c49d..ea405f5b90c14 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -103,7 +103,7 @@ public void testPublishCheckpointActionOnPrimary() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); }); @@ -135,7 +135,7 @@ public void testPublishCheckpointActionOnReplica() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index 6f2be9db6b2dd..6b212a7021e4a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -67,13 +67,7 @@ public static IndexShard createMockIndexShard() throws IOException { when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); - ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( - mockShard.shardId(), - mockShard.getOperationPrimaryTerm(), - 0L, - mockShard.getProcessedLocalCheckpoint(), - 0L - ); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(mockShard.shardId(), mockShard.getOperationPrimaryTerm(), 0L, 0L); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), testCheckpoint