diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index fbdd3598facde..690e78d78a6c6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -476,7 +476,7 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1; - assertEquals(expectedMaxSeqNo, replicaShard.getLatestReplicationCheckpoint().getSeqNo()); + assertEquals(expectedMaxSeqNo, replicaShard.seqNoStats().getMaxSeqNo()); // index another doc. client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 1d28921fdf2a6..5fd49813bb849 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -40,6 +40,8 @@ import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; + /** * This is an {@link Engine} implementation intended for replica shards when Segment Replication * is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager} @@ -122,10 +124,11 @@ public TranslogManager translogManager() { return translogManager; } - public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { + public synchronized void updateSegments(final SegmentInfos infos) throws IOException { // Update the current infos reference on the Engine's reader. ensureOpen(); try (ReleasableLock lock = writeLock.acquire()) { + final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); @@ -133,11 +136,11 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th // lower/higher gens are possible from a new primary that was just elected. if (incomingGeneration != lastReceivedGen) { commitSegmentInfos(); - translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); + translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f752a842d6738..546affcf48bd2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1426,9 +1426,9 @@ private Optional getReplicationEngine() { } } - public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { + public void finalizeReplication(SegmentInfos infos) throws IOException { if (getReplicationEngine().isPresent()) { - getReplicationEngine().get().updateSegments(infos, seqNo); + getReplicationEngine().get().updateSegments(infos); } } @@ -1481,17 +1481,19 @@ public Tuple, ReplicationCheckpoint> getLatestSegme } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); - return Optional.ofNullable(snapshot.get()).map(segmentInfos -> new Tuple<>( - snapshot, - new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - segmentInfos.getGeneration(), - // fetch local cp from the previous commit, current local checkpoint may be ahead already. - Long.parseLong(segmentInfos.userData.get(MAX_SEQ_NO)), - segmentInfos.getVersion() + return Optional.ofNullable(snapshot.get()) + .map( + segmentInfos -> new Tuple<>( + snapshot, + new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + segmentInfos.getVersion() + ) + ) ) - )).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + .orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 6f46fe8398388..1e36e7c155b02 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -220,7 +220,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, responseCheckpoint.getSegmentsGen() ); cancellableThreads.checkForCancel(); - indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); + indexShard.finalizeReplication(infos); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 6a4e5e449f178..c03473e0a8c30 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -28,7 +28,6 @@ public class ReplicationCheckpoint implements Writeable, Comparable() { @Override @@ -322,7 +320,7 @@ public void getSegmentFiles( ); segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); - doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + doThrow(exception).when(spyIndexShard).finalizeReplication(any()); segrepTarget.startReplication(new ActionListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 704b40b05c49d..ea405f5b90c14 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -103,7 +103,7 @@ public void testPublishCheckpointActionOnPrimary() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); }); @@ -135,7 +135,7 @@ public void testPublishCheckpointActionOnReplica() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index 6f2be9db6b2dd..6b212a7021e4a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -67,13 +67,7 @@ public static IndexShard createMockIndexShard() throws IOException { when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); - ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( - mockShard.shardId(), - mockShard.getOperationPrimaryTerm(), - 0L, - mockShard.getProcessedLocalCheckpoint(), - 0L - ); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(mockShard.shardId(), mockShard.getOperationPrimaryTerm(), 0L, 0L); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), testCheckpoint