Skip to content

Commit

Permalink
Segment Replication - Remove seqNo field from ReplicationCheckpoint a…
Browse files Browse the repository at this point in the history
…nd use UserData to transfer state. (#6594)

* Segment Replication - Fix incorrect maxSeqNo computation.

This change updates getLatestSegmentInfos to only return the max seqNo from the previous
commit point.  This is the only way to guarantee that up to this seqNo has made it into the commit point.

Signed-off-by: Marc Handalian <[email protected]>

* Remove unnecessary seqNo field from ReplicationCheckpoint.

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
(cherry picked from commit f4739bb)
  • Loading branch information
mch2 committed Mar 9, 2023
1 parent d9d1dc8 commit 2efcb0c
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

import static java.util.Arrays.asList;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -423,6 +423,68 @@ public void testDeleteOperations() throws Exception {
}
}

/**
* This tests that the max seqNo we send to replicas is accurate and that after failover
* the new primary starts indexing from the correct maxSeqNo and replays the correct count of docs
* from xlog.
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, primary, replica);

final int deletedDocCount = randomIntBetween(10, initialDocCount);
for (int i = 0; i < deletedDocCount; i++) {
client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
}
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();

// randomly flush here after the force merge to wipe any old segments.
if (randomBoolean()) {
flush(INDEX_NAME);
}

final IndexShard primaryShard = getIndexShard(primary, INDEX_NAME);
final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME);
assertBusy(
() -> assertEquals(
primaryShard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replicaShard.getLatestReplicationCheckpoint().getSegmentInfosVersion()
)
);

// add some docs to the xlog and drop primary.
final int additionalDocs = randomIntBetween(1, 50);
for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
// Drop the primary and wait until replica is promoted.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
ensureYellowAndNoInitializingShards(INDEX_NAME);

final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica);
assertNotNull(replicaShardRouting);
assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary());
refresh(INDEX_NAME);
final long expectedHitCount = initialDocCount + additionalDocs - deletedDocCount;
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1;
assertEquals(expectedMaxSeqNo, replicaShard.seqNoStats().getMaxSeqNo());

// index another doc.
client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get();
refresh(INDEX_NAME);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1);
}

public void testUpdateOperations() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.function.BiFunction;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;

/**
* This is an {@link Engine} implementation intended for replica shards when Segment Replication
* is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager}
Expand Down Expand Up @@ -123,22 +125,23 @@ public TranslogManager translogManager() {
return translogManager;
}

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
public synchronized void updateSegments(final SegmentInfos infos) throws IOException {
// Update the current infos reference on the Engine's reader.
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) {
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo);
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo);
}
}

Expand Down
27 changes: 10 additions & 17 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th

public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException {
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos, seqNo);
getReplicationEngine().get().updateSegments(infos);
}
}

Expand Down Expand Up @@ -1478,22 +1478,15 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> {
try {
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
)
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion()
)
)).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
responseCheckpoint.getSegmentsGen()
);
cancellableThreads.checkForCancel();
indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo());
indexShard.finalizeReplication(infos);
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final ShardId shardId;
private final long primaryTerm;
private final long segmentsGen;
private final long seqNo;
private final long segmentInfosVersion;

public static ReplicationCheckpoint empty(ShardId shardId) {
Expand All @@ -39,23 +38,20 @@ private ReplicationCheckpoint(ShardId shardId) {
this.shardId = shardId;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
segmentsGen = SequenceNumbers.NO_OPS_PERFORMED;
seqNo = SequenceNumbers.NO_OPS_PERFORMED;
segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED;
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long segmentInfosVersion) {
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.seqNo = seqNo;
this.segmentInfosVersion = segmentInfosVersion;
}

public ReplicationCheckpoint(StreamInput in) throws IOException {
shardId = new ShardId(in);
primaryTerm = in.readLong();
segmentsGen = in.readLong();
seqNo = in.readLong();
segmentInfosVersion = in.readLong();
}

Expand All @@ -82,13 +78,6 @@ public long getSegmentInfosVersion() {
return segmentInfosVersion;
}

/**
* @return the Seq number
*/
public long getSeqNo() {
return seqNo;
}

/**
* Shard Id of primary shard.
*
Expand All @@ -103,7 +92,6 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(primaryTerm);
out.writeLong(segmentsGen);
out.writeLong(seqNo);
out.writeLong(segmentInfosVersion);
}

Expand All @@ -119,14 +107,13 @@ public boolean equals(Object o) {
ReplicationCheckpoint that = (ReplicationCheckpoint) o;
return primaryTerm == that.primaryTerm
&& segmentsGen == that.segmentsGen
&& seqNo == that.seqNo
&& segmentInfosVersion == that.segmentInfosVersion
&& Objects.equals(shardId, that.shardId);
}

@Override
public int hashCode() {
return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo);
return Objects.hash(shardId, primaryTerm, segmentsGen);
}

/**
Expand All @@ -148,8 +135,6 @@ public String toString() {
+ primaryTerm
+ ", segmentsGen="
+ segmentsGen
+ ", seqNo="
+ seqNo
+ ", version="
+ segmentInfosVersion
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -253,9 +253,9 @@ public void testPreferReplicaWithNullReplicationCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false);
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -319,9 +319,9 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -351,9 +351,9 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
allocId1,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -384,9 +384,9 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2));
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
// flush the primary engine - we don't need any segments, just force a new commit point.
engine.flush(true, true);
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration());
}
Expand All @@ -144,7 +144,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti
// update the replica with segments_2 from the primary.
final SegmentInfos primaryInfos = engine.getLatestSegmentInfos();
assertEquals(2, primaryInfos.getGeneration());
nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(primaryInfos);
assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
Expand Down Expand Up @@ -172,7 +172,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep
// update replica with the latest primary infos, it will be the same gen, segments_2, ensure it is also committed.
final SegmentInfos primaryInfos = engine.getLatestSegmentInfos();
assertEquals(2, primaryInfos.getGeneration());
nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(primaryInfos);
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
assertEquals(primaryInfos.getVersion(), lastCommittedSegmentInfos.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentInfos, ReplicationCheckpoint checkpoint)
throws IOException {
assertNotNull(segmentInfos);
assertEquals(checkpoint.getSeqNo(), shard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos));
assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion());
assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration());
}
Expand Down Expand Up @@ -308,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard);
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
Expand Down Expand Up @@ -1024,8 +1023,6 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun
// assigned seqNos start at 0, so assert max & local seqNos are 1 less than our persisted doc count.
assertEquals(expectedPersistedDocCount - 1, indexShard.seqNoStats().getMaxSeqNo());
assertEquals(expectedPersistedDocCount - 1, indexShard.seqNoStats().getLocalCheckpoint());
// processed cp should be 1 less than our searchable doc count.
assertEquals(expectedSearchableDocCount - 1, indexShard.getProcessedLocalCheckpoint());
}

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
Expand Down
Loading

0 comments on commit 2efcb0c

Please sign in to comment.