From 91372c69d6893f89b4bedb7c442fb8f08bc17fab Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 1 Jun 2022 19:23:54 -0700 Subject: [PATCH] Removing default implementation of getProcessedLocalCheckpoint This now requires implementations in child classes. NRTReplicationEngine already has an implementation, so an @Override annotation has been added. For ReadOnlyEngine, where no processing occurs, the processed local checkpoint is expected to be equal to the persisted local checkpoint. Unit test for ReadOnlyEngine have been updated. Signed-off-by: Kartik Ganesh --- .../src/main/java/org/opensearch/index/engine/Engine.java | 5 +---- .../org/opensearch/index/engine/NRTReplicationEngine.java | 1 + .../java/org/opensearch/index/engine/ReadOnlyEngine.java | 7 +++++++ .../org/opensearch/index/engine/ReadOnlyEngineTests.java | 3 +++ 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 5dd4914770708..c6cdec328be85 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -863,10 +863,7 @@ public final CommitStats commitStats() { * @return the latest checkpoint that has been processed but not necessarily persisted. * Also see {@link #getPersistedLocalCheckpoint()} */ - public long getProcessedLocalCheckpoint() { - // default implementation - return 0L; - } + public abstract long getProcessedLocalCheckpoint(); /** * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 106643198cc3b..e4f4bbbba8f16 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -248,6 +248,7 @@ public long getPersistedLocalCheckpoint() { return localCheckpointTracker.getPersistedCheckpoint(); } + @Override public long getProcessedLocalCheckpoint() { return localCheckpointTracker.getProcessedCheckpoint(); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 23a86d8da5599..6262a9269c01c 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -374,6 +374,13 @@ public long getPersistedLocalCheckpoint() { return seqNoStats.getLocalCheckpoint(); } + @Override + public long getProcessedLocalCheckpoint() { + // the read-only engine does not process checkpoints, so its + // processed checkpoint is identical to its persisted one. + return getPersistedLocalCheckpoint(); + } + @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java index 2106c5e1067fb..da0db02ac402e 100644 --- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java @@ -107,6 +107,7 @@ public void testReadOnlyEngine() throws Exception { lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); lastDocIds = getDocIds(engine, true); assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); for (int i = 0; i < numDocs; i++) { @@ -131,6 +132,7 @@ public void testReadOnlyEngine() throws Exception { IOUtils.close(external, internal); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); try (Engine.GetResult getResult = readOnlyEngine.get(get, readOnlyEngine::acquireSearcher)) { @@ -142,6 +144,7 @@ public void testReadOnlyEngine() throws Exception { recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); }