From ed4fdebf88ed21f1cbf266aff79c3d54c8c4968a Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 9 Mar 2023 10:54:19 -0700 Subject: [PATCH] Segment Replication - Remove seqNo field from ReplicationCheckpoint and use UserData to transfer state. (#6594) * Segment Replication - Fix incorrect maxSeqNo computation. This change updates getLatestSegmentInfos to only return the max seqNo from the previous commit point. This is the only way to guarantee that up to this seqNo has made it into the commit point. Signed-off-by: Marc Handalian * Remove unnecessary seqNo field from ReplicationCheckpoint. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian (cherry picked from commit f4739bb8757d0153d01e83b3aaf8b76724ba3b04) Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 64 ++++++++++++++++++- .../index/engine/NRTReplicationEngine.java | 9 ++- .../opensearch/index/shard/IndexShard.java | 29 ++++----- .../replication/SegmentReplicationTarget.java | 2 +- .../checkpoint/ReplicationCheckpoint.java | 19 +----- .../gateway/PrimaryShardAllocatorTests.java | 28 ++++---- .../engine/NRTReplicationEngineTests.java | 6 +- .../SegmentReplicationIndexShardTests.java | 5 +- .../OngoingSegmentReplicationsTests.java | 8 +-- .../PrimaryShardReplicationSourceTests.java | 32 ++-------- .../SegmentReplicationSourceServiceTests.java | 8 +-- .../SegmentReplicationTargetServiceTests.java | 4 +- .../SegmentReplicationTargetTests.java | 10 ++- .../PublishCheckpointActionTests.java | 4 +- .../replication/common/CopyStateTests.java | 8 +-- 15 files changed, 115 insertions(+), 121 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 9975a5ff65a34..7b40824644ed8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -32,8 +32,8 @@ import static java.util.Arrays.asList; import static org.opensearch.index.query.QueryBuilders.matchQuery; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -423,6 +423,68 @@ public void testDeleteOperations() throws Exception { } } + /** + * This tests that the max seqNo we send to replicas is accurate and that after failover + * the new primary starts indexing from the correct maxSeqNo and replays the correct count of docs + * from xlog. + */ + public void testReplicationPostDeleteAndForceMerge() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(10, 200); + for (int i = 0; i < initialDocCount; i++) { + client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); + } + refresh(INDEX_NAME); + waitForSearchableDocs(initialDocCount, primary, replica); + + final int deletedDocCount = randomIntBetween(10, initialDocCount); + for (int i = 0; i < deletedDocCount; i++) { + client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + } + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + + // randomly flush here after the force merge to wipe any old segments. + if (randomBoolean()) { + flush(INDEX_NAME); + } + + final IndexShard primaryShard = getIndexShard(primary, INDEX_NAME); + final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); + assertBusy( + () -> assertEquals( + primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ) + ); + + // add some docs to the xlog and drop primary. + final int additionalDocs = randomIntBetween(1, 50); + for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) { + client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); + } + // Drop the primary and wait until replica is promoted. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + + final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); + assertNotNull(replicaShardRouting); + assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + refresh(INDEX_NAME); + final long expectedHitCount = initialDocCount + additionalDocs - deletedDocCount; + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1; + assertEquals(expectedMaxSeqNo, replicaShard.seqNoStats().getMaxSeqNo()); + + // index another doc. + client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get(); + refresh(INDEX_NAME); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1); + } + public void testUpdateOperations() throws Exception { internalCluster().startClusterManagerOnlyNode(); final String primary = internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 76461783fb724..cb5a7fbc3f181 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -42,6 +42,8 @@ import java.util.function.BiFunction; import java.util.stream.Stream; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; + /** * This is an {@link Engine} implementation intended for replica shards when Segment Replication * is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager} @@ -123,10 +125,11 @@ public TranslogManager translogManager() { return translogManager; } - public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { + public synchronized void updateSegments(final SegmentInfos infos) throws IOException { // Update the current infos reference on the Engine's reader. ensureOpen(); try (ReleasableLock lock = writeLock.acquire()) { + final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); @@ -134,11 +137,11 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th // lower/higher gens are possible from a new primary that was just elected. if (incomingGeneration != lastReceivedGen) { commitSegmentInfos(); - translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); + translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index dd180b95e6b96..d191b80d9358e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1423,9 +1423,9 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } - public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { + public void finalizeReplication(SegmentInfos infos) throws IOException { if (getReplicationEngine().isPresent()) { - getReplicationEngine().get().updateSegments(infos, seqNo); + getReplicationEngine().get().updateSegments(infos); } } @@ -1478,22 +1478,15 @@ public Tuple, ReplicationCheckpoint> getLatestSegme } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); - return Optional.ofNullable(snapshot.get()).map(segmentInfos -> { - try { - return new Tuple<>( - snapshot, - new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - segmentInfos.getGeneration(), - shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(), - segmentInfos.getVersion() - ) - ); - } catch (IOException e) { - throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); - } - }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + return Optional.ofNullable(snapshot.get()).map(segmentInfos -> new Tuple<>( + snapshot, + new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + segmentInfos.getVersion() + ) + )).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 6f46fe8398388..1e36e7c155b02 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -220,7 +220,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, responseCheckpoint.getSegmentsGen() ); cancellableThreads.checkForCancel(); - indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); + indexShard.finalizeReplication(infos); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 6a4e5e449f178..c03473e0a8c30 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -28,7 +28,6 @@ public class ReplicationCheckpoint implements Writeable, Comparable() { @Override @@ -322,7 +320,7 @@ public void getSegmentFiles( ); segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); - doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + doThrow(exception).when(spyIndexShard).finalizeReplication(any()); segrepTarget.startReplication(new ActionListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 704b40b05c49d..ea405f5b90c14 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -103,7 +103,7 @@ public void testPublishCheckpointActionOnPrimary() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); }); @@ -135,7 +135,7 @@ public void testPublishCheckpointActionOnReplica() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index 6f2be9db6b2dd..6b212a7021e4a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -67,13 +67,7 @@ public static IndexShard createMockIndexShard() throws IOException { when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); - ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( - mockShard.shardId(), - mockShard.getOperationPrimaryTerm(), - 0L, - mockShard.getProcessedLocalCheckpoint(), - 0L - ); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(mockShard.shardId(), mockShard.getOperationPrimaryTerm(), 0L, 0L); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), testCheckpoint