Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication - Remove seqNo field from ReplicationCheckpoint and use UserData to transfer state. #6594

Merged
merged 2 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

import static java.util.Arrays.asList;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -422,6 +422,68 @@ public void testDeleteOperations() throws Exception {
}
}

/**
* This tests that the max seqNo we send to replicas is accurate and that after failover
* the new primary starts indexing from the correct maxSeqNo and replays the correct count of docs
* from xlog.
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, primary, replica);

final int deletedDocCount = randomIntBetween(10, initialDocCount);
for (int i = 0; i < deletedDocCount; i++) {
client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
}
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();

// randomly flush here after the force merge to wipe any old segments.
if (randomBoolean()) {
flush(INDEX_NAME);
}

final IndexShard primaryShard = getIndexShard(primary, INDEX_NAME);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
assertBusy(
() -> assertEquals(
primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);

// add some docs to the xlog and drop primary.
final int additionalDocs = randomIntBetween(1, 50);
for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
// Drop the primary and wait until replica is promoted.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
ensureYellowAndNoInitializingShards(INDEX_NAME);

final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica);
assertNotNull(replicaShardRouting);
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
refresh(INDEX_NAME);
final long expectedHitCount = initialDocCount + additionalDocs - deletedDocCount;
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1;
assertEquals(expectedMaxSeqNo, replicaShard.seqNoStats().getMaxSeqNo());

// index another doc.
client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get();
refresh(INDEX_NAME);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1);
}

public void testUpdateOperations() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;

import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;

/**
* This is an {@link Engine} implementation intended for replica shards when Segment Replication
* is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager}
Expand Down Expand Up @@ -122,22 +124,23 @@ public TranslogManager translogManager() {
return translogManager;
}

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
public synchronized void updateSegments(final SegmentInfos infos) throws IOException {
// Update the current infos reference on the Engine's reader.
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) {
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo);
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo);
}
}

Expand Down
19 changes: 8 additions & 11 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1426,9 +1426,9 @@ private Optional<NRTReplicationEngine> getReplicationEngine() {
}
}

public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException {
public void finalizeReplication(SegmentInfos infos) throws IOException {
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos, seqNo);
getReplicationEngine().get().updateSegments(infos);
}
}

Expand Down Expand Up @@ -1481,22 +1481,19 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> {
try {
return new Tuple<>(
return Optional.ofNullable(snapshot.get())
.map(
segmentInfos -> new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
)
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
)
)
.orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
responseCheckpoint.getSegmentsGen()
);
cancellableThreads.checkForCancel();
indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo());
indexShard.finalizeReplication(infos);
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final ShardId shardId;
private final long primaryTerm;
private final long segmentsGen;
private final long seqNo;
private final long segmentInfosVersion;

public static ReplicationCheckpoint empty(ShardId shardId) {
Expand All @@ -39,23 +38,20 @@ private ReplicationCheckpoint(ShardId shardId) {
this.shardId = shardId;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
segmentsGen = SequenceNumbers.NO_OPS_PERFORMED;
seqNo = SequenceNumbers.NO_OPS_PERFORMED;
segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED;
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long segmentInfosVersion) {
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.seqNo = seqNo;
this.segmentInfosVersion = segmentInfosVersion;
}

public ReplicationCheckpoint(StreamInput in) throws IOException {
shardId = new ShardId(in);
primaryTerm = in.readLong();
segmentsGen = in.readLong();
seqNo = in.readLong();
segmentInfosVersion = in.readLong();
}

Expand All @@ -82,13 +78,6 @@ public long getSegmentInfosVersion() {
return segmentInfosVersion;
}

/**
* @return the Seq number
*/
public long getSeqNo() {
return seqNo;
}

/**
* Shard Id of primary shard.
*
Expand All @@ -103,7 +92,6 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(primaryTerm);
out.writeLong(segmentsGen);
out.writeLong(seqNo);
out.writeLong(segmentInfosVersion);
}

Expand All @@ -119,14 +107,13 @@ public boolean equals(Object o) {
ReplicationCheckpoint that = (ReplicationCheckpoint) o;
return primaryTerm == that.primaryTerm
&& segmentsGen == that.segmentsGen
&& seqNo == that.seqNo
&& segmentInfosVersion == that.segmentInfosVersion
&& Objects.equals(shardId, that.shardId);
}

@Override
public int hashCode() {
return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo);
return Objects.hash(shardId, primaryTerm, segmentsGen);
}

/**
Expand All @@ -148,8 +135,6 @@ public String toString() {
+ primaryTerm
+ ", segmentsGen="
+ segmentsGen
+ ", seqNo="
+ seqNo
+ ", version="
+ segmentInfosVersion
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -253,9 +253,9 @@ public void testPreferReplicaWithNullReplicationCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false);
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -319,9 +319,9 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -351,9 +351,9 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
allocId1,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -384,9 +384,9 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2));
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
// flush the primary engine - we don't need any segments, just force a new commit point.
engine.flush(true, true);
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration());
}
Expand All @@ -147,7 +147,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti
// update the replica with segments_2 from the primary.
final SegmentInfos primaryInfos = engine.getLatestSegmentInfos();
assertEquals(2, primaryInfos.getGeneration());
nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(primaryInfos);
assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep
// update replica with the latest primary infos, it will be the same gen, segments_2, ensure it is also committed.
final SegmentInfos primaryInfos = engine.getLatestSegmentInfos();
assertEquals(2, primaryInfos.getGeneration());
nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(primaryInfos);
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
assertEquals(primaryInfos.getVersion(), lastCommittedSegmentInfos.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentInfos, ReplicationCheckpoint checkpoint)
throws IOException {
assertNotNull(segmentInfos);
assertEquals(checkpoint.getSeqNo(), shard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos));
assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion());
assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration());
}
Expand Down Expand Up @@ -308,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard);
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
Expand Down Expand Up @@ -1024,8 +1023,6 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun
// assigned seqNos start at 0, so assert max & local seqNos are 1 less than our persisted doc count.
assertEquals(expectedPersistedDocCount - 1, indexShard.seqNoStats().getMaxSeqNo());
assertEquals(expectedPersistedDocCount - 1, indexShard.seqNoStats().getLocalCheckpoint());
// processed cp should be 1 less than our searchable doc count.
assertEquals(expectedSearchableDocCount - 1, indexShard.getProcessedLocalCheckpoint());
}

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
Expand Down
Loading