Skip to content

Commit

Permalink
Remove unnecessary seqNo field from ReplicationCheckpoint.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Mar 9, 2023
1 parent b0c674c commit 008d466
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

int expectedMaxSeqNo = initialDocCount + deletedDocCount + additionalDocs - 1;
assertEquals(expectedMaxSeqNo, replicaShard.getLatestReplicationCheckpoint().getSeqNo());
assertEquals(expectedMaxSeqNo, replicaShard.seqNoStats().getMaxSeqNo());

// index another doc.
client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;

import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;

/**
* This is an {@link Engine} implementation intended for replica shards when Segment Replication
* is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager}
Expand Down Expand Up @@ -122,22 +124,23 @@ public TranslogManager translogManager() {
return translogManager;
}

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
public synchronized void updateSegments(final SegmentInfos infos) throws IOException {
// Update the current infos reference on the Engine's reader.
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) {
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo);
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
localCheckpointTracker.fastForwardProcessedSeqNo(maxSeqNo);
}
}

Expand Down
26 changes: 14 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1426,9 +1426,9 @@ private Optional<NRTReplicationEngine> getReplicationEngine() {
}
}

public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException {
public void finalizeReplication(SegmentInfos infos) throws IOException {
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos, seqNo);
getReplicationEngine().get().updateSegments(infos);
}
}

Expand Down Expand Up @@ -1481,17 +1481,19 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
// fetch local cp from the previous commit, current local checkpoint may be ahead already.
Long.parseLong(segmentInfos.userData.get(MAX_SEQ_NO)),
segmentInfos.getVersion()
return Optional.ofNullable(snapshot.get())
.map(
segmentInfos -> new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion()
)
)
)
)).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
.orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
responseCheckpoint.getSegmentsGen()
);
cancellableThreads.checkForCancel();
indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo());
indexShard.finalizeReplication(infos);
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final ShardId shardId;
private final long primaryTerm;
private final long segmentsGen;
private final long seqNo;
private final long segmentInfosVersion;

public static ReplicationCheckpoint empty(ShardId shardId) {
Expand All @@ -39,23 +38,20 @@ private ReplicationCheckpoint(ShardId shardId) {
this.shardId = shardId;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
segmentsGen = SequenceNumbers.NO_OPS_PERFORMED;
seqNo = SequenceNumbers.NO_OPS_PERFORMED;
segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED;
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long segmentInfosVersion) {
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.seqNo = seqNo;
this.segmentInfosVersion = segmentInfosVersion;
}

public ReplicationCheckpoint(StreamInput in) throws IOException {
shardId = new ShardId(in);
primaryTerm = in.readLong();
segmentsGen = in.readLong();
seqNo = in.readLong();
segmentInfosVersion = in.readLong();
}

Expand All @@ -82,13 +78,6 @@ public long getSegmentInfosVersion() {
return segmentInfosVersion;
}

/**
* @return the Seq number
*/
public long getSeqNo() {
return seqNo;
}

/**
* Shard Id of primary shard.
*
Expand All @@ -103,7 +92,6 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(primaryTerm);
out.writeLong(segmentsGen);
out.writeLong(seqNo);
out.writeLong(segmentInfosVersion);
}

Expand All @@ -119,14 +107,13 @@ public boolean equals(Object o) {
ReplicationCheckpoint that = (ReplicationCheckpoint) o;
return primaryTerm == that.primaryTerm
&& segmentsGen == that.segmentsGen
&& seqNo == that.seqNo
&& segmentInfosVersion == that.segmentInfosVersion
&& Objects.equals(shardId, that.shardId);
}

@Override
public int hashCode() {
return Objects.hash(shardId, primaryTerm, segmentsGen, seqNo);
return Objects.hash(shardId, primaryTerm, segmentsGen);
}

/**
Expand All @@ -148,8 +135,6 @@ public String toString() {
+ primaryTerm
+ ", segmentsGen="
+ segmentsGen
+ ", seqNo="
+ seqNo
+ ", version="
+ segmentInfosVersion
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -253,9 +253,9 @@ public void testPreferReplicaWithNullReplicationCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 10, 101, 1));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false);
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -319,9 +319,9 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -351,9 +351,9 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
allocId1,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -384,9 +384,9 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 10, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 10, 120, 2));
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
// flush the primary engine - we don't need any segments, just force a new commit point.
engine.flush(true, true);
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration());
}
Expand All @@ -147,7 +147,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti
// update the replica with segments_2 from the primary.
final SegmentInfos primaryInfos = engine.getLatestSegmentInfos();
assertEquals(2, primaryInfos.getGeneration());
nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(primaryInfos);
assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep
// update replica with the latest primary infos, it will be the same gen, segments_2, ensure it is also committed.
final SegmentInfos primaryInfos = engine.getLatestSegmentInfos();
assertEquals(2, primaryInfos.getGeneration());
nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint());
nrtEngine.updateSegments(primaryInfos);
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
assertEquals(primaryInfos.getVersion(), lastCommittedSegmentInfos.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentInfos, ReplicationCheckpoint checkpoint)
throws IOException {
assertNotNull(segmentInfos);
assertEquals(checkpoint.getSeqNo(), shard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos));
assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion());
assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration());
}
Expand Down Expand Up @@ -308,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard);
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,7 @@ public void setUp() throws Exception {
ShardId testShardId = primary.shardId();

// This mirrors the creation of the ReplicationCheckpoint inside CopyState
testCheckpoint = new ReplicationCheckpoint(
testShardId,
primary.getOperationPrimaryTerm(),
0L,
primary.getProcessedLocalCheckpoint(),
0L
);
testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L);
IndexService mockIndexService = mock(IndexService.class);
when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService);
when(mockIndexService.getShard(testShardId.id())).thenReturn(primary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,7 @@ public void tearDown() throws Exception {
}

public void testGetCheckpointMetadata() {
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
SEQ_NO,
VERSION
);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION);
replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, mock(ActionListener.class));
CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear();
assertEquals(1, requestList.length);
Expand All @@ -110,13 +104,7 @@ public void testGetCheckpointMetadata() {
}

public void testGetSegmentFiles() {
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
SEQ_NO,
VERSION
);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION);
StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST);
replicationSource.getSegmentFiles(
REPLICATION_ID,
Expand All @@ -138,13 +126,7 @@ public void testGetSegmentFiles() {
*/
public void testTransportTimeoutForGetSegmentFilesAction() {
long fileSize = (long) (Math.pow(10, 9));
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
SEQ_NO,
VERSION
);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION);
StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", fileSize, "checksum", Version.LATEST);
replicationSource.getSegmentFiles(
REPLICATION_ID,
Expand All @@ -163,13 +145,7 @@ public void testTransportTimeoutForGetSegmentFilesAction() {

public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
SEQ_NO,
VERSION
);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION);
StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST);
replicationSource.getSegmentFiles(
REPLICATION_ID,
Expand Down
Loading

0 comments on commit 008d466

Please sign in to comment.