From b0c674c5868547c6bfea2d4d624212de3b629380 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 8 Mar 2023 15:42:24 -0800 Subject: [PATCH 1/2] Segment Replication - Fix incorrect maxSeqNo computation. This change updates getLatestSegmentInfos to only return the max seqNo from the previous commit point. This is the only way to guarantee that up to this seqNo has made it into the commit point. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 64 ++++++++++++++++++- .../opensearch/index/shard/IndexShard.java | 27 ++++---- 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 7d3a2a8f69bc8..fbdd3598facde 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -32,8 +32,8 @@ import static java.util.Arrays.asList; import static org.opensearch.index.query.QueryBuilders.matchQuery; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -422,6 +422,68 @@ public void testDeleteOperations() throws Exception { } } + /** + * This tests that the max seqNo we send to replicas is accurate and that after failover + * the new primary starts indexing from the correct maxSeqNo and replays the correct count of docs + * from xlog. + */ + public void testReplicationPostDeleteAndForceMerge() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(0, 200); + for (int i = 0; i < initialDocCount; i++) { + client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(initialDocCount, primary, replica); + + final int deletedDocCount = randomIntBetween(0, initialDocCount / 2); + for (int i = 0; i < deletedDocCount; i++) { + client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + } + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + + // randomly flush here after the force merge to wipe any old segments. + if (randomBoolean()) { + flush(INDEX_NAME); + } + + final IndexShard primaryShard = getIndexShard(primary, INDEX_NAME); + final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); + assertBusy( + () -> assertEquals( + primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ) + ); + + // add some docs to the xlog and drop primary. + final int additionalDocs = randomIntBetween(1, 50); + for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) { + client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); + } + // Drop the primary and wait until replica is promoted. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + + final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); + assertNotNull(replicaShardRouting); + assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + refresh(INDEX_NAME); + final long expectedHitCount = initialDocCount + additionalDocs - deletedDocCount; + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1; + assertEquals(expectedMaxSeqNo, replicaShard.getLatestReplicationCheckpoint().getSeqNo()); + + // index another doc. + client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get(); + refresh(INDEX_NAME); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1); + } + public void testUpdateOperations() throws Exception { internalCluster().startClusterManagerOnlyNode(); final String primary = internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 71ec8a4decd38..f752a842d6738 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1481,22 +1481,17 @@ public Tuple, ReplicationCheckpoint> getLatestSegme } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); - return Optional.ofNullable(snapshot.get()).map(segmentInfos -> { - try { - return new Tuple<>( - snapshot, - new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - segmentInfos.getGeneration(), - shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(), - segmentInfos.getVersion() - ) - ); - } catch (IOException e) { - throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); - } - }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + return Optional.ofNullable(snapshot.get()).map(segmentInfos -> new Tuple<>( + snapshot, + new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + // fetch local cp from the previous commit, current local checkpoint may be ahead already. + Long.parseLong(segmentInfos.userData.get(MAX_SEQ_NO)), + segmentInfos.getVersion() + ) + )).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); } /** From fe742ec67e6ab045d4d5ce7ebe508ed12de58e3a Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 8 Mar 2023 18:02:42 -0800 Subject: [PATCH 2/2] Remove unnecessary seqNo field from ReplicationCheckpoint. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 6 ++-- .../index/engine/NRTReplicationEngine.java | 9 ++++-- .../opensearch/index/shard/IndexShard.java | 26 ++++++++------- .../replication/SegmentReplicationTarget.java | 2 +- .../checkpoint/ReplicationCheckpoint.java | 19 ++--------- .../gateway/PrimaryShardAllocatorTests.java | 28 ++++++++-------- .../engine/NRTReplicationEngineTests.java | 6 ++-- .../SegmentReplicationIndexShardTests.java | 5 +-- .../OngoingSegmentReplicationsTests.java | 8 +---- .../PrimaryShardReplicationSourceTests.java | 32 +++---------------- .../SegmentReplicationSourceServiceTests.java | 8 +---- .../SegmentReplicationTargetServiceTests.java | 4 +-- .../SegmentReplicationTargetTests.java | 10 +++--- .../PublishCheckpointActionTests.java | 4 +-- .../replication/common/CopyStateTests.java | 8 +---- 15 files changed, 58 insertions(+), 117 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index fbdd3598facde..820fa17b77c48 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -432,14 +432,14 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { createIndex(INDEX_NAME); final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(0, 200); + final int initialDocCount = scaledRandomIntBetween(10, 200); for (int i = 0; i < initialDocCount; i++) { client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); } refresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, primary, replica); - final int deletedDocCount = randomIntBetween(0, initialDocCount / 2); + final int deletedDocCount = randomIntBetween(10, initialDocCount); for (int i = 0; i < deletedDocCount; i++) { client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); } @@ -476,7 +476,7 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1; - assertEquals(expectedMaxSeqNo, replicaShard.getLatestReplicationCheckpoint().getSeqNo()); + assertEquals(expectedMaxSeqNo, replicaShard.seqNoStats().getMaxSeqNo()); // index another doc. client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 1d28921fdf2a6..5fd49813bb849 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -40,6 +40,8 @@ import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; + /** * This is an {@link Engine} implementation intended for replica shards when Segment Replication * is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager} @@ -122,10 +124,11 @@ public TranslogManager translogManager() { return translogManager; } - public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { + public synchronized void updateSegments(final SegmentInfos infos) throws IOException { // Update the current infos reference on the Engine's reader. ensureOpen(); try (ReleasableLock lock = writeLock.acquire()) { + final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); @@ -133,11 +136,11 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th // lower/higher gens are possible from a new primary that was just elected. if (incomingGeneration != lastReceivedGen) { commitSegmentInfos(); - translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); + translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f752a842d6738..546affcf48bd2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1426,9 +1426,9 @@ private Optional getReplicationEngine() { } } - public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { + public void finalizeReplication(SegmentInfos infos) throws IOException { if (getReplicationEngine().isPresent()) { - getReplicationEngine().get().updateSegments(infos, seqNo); + getReplicationEngine().get().updateSegments(infos); } } @@ -1481,17 +1481,19 @@ public Tuple, ReplicationCheckpoint> getLatestSegme } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); - return Optional.ofNullable(snapshot.get()).map(segmentInfos -> new Tuple<>( - snapshot, - new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - segmentInfos.getGeneration(), - // fetch local cp from the previous commit, current local checkpoint may be ahead already. - Long.parseLong(segmentInfos.userData.get(MAX_SEQ_NO)), - segmentInfos.getVersion() + return Optional.ofNullable(snapshot.get()) + .map( + segmentInfos -> new Tuple<>( + snapshot, + new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + segmentInfos.getVersion() + ) + ) ) - )).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + .orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 6f46fe8398388..1e36e7c155b02 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -220,7 +220,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, responseCheckpoint.getSegmentsGen() ); cancellableThreads.checkForCancel(); - indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); + indexShard.finalizeReplication(infos); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 6a4e5e449f178..c03473e0a8c30 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -28,7 +28,6 @@ public class ReplicationCheckpoint implements Writeable, Comparable() { @Override @@ -322,7 +320,7 @@ public void getSegmentFiles( ); segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); - doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + doThrow(exception).when(spyIndexShard).finalizeReplication(any()); segrepTarget.startReplication(new ActionListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 704b40b05c49d..ea405f5b90c14 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -103,7 +103,7 @@ public void testPublishCheckpointActionOnPrimary() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); }); @@ -135,7 +135,7 @@ public void testPublishCheckpointActionOnReplica() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index 6f2be9db6b2dd..6b212a7021e4a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -67,13 +67,7 @@ public static IndexShard createMockIndexShard() throws IOException { when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); - ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( - mockShard.shardId(), - mockShard.getOperationPrimaryTerm(), - 0L, - mockShard.getProcessedLocalCheckpoint(), - 0L - ); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(mockShard.shardId(), mockShard.getOperationPrimaryTerm(), 0L, 0L); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), testCheckpoint