diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 65356d959f189..50d0f62b8a79d 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2948,7 +2948,7 @@ public final long lastRefreshedCheckpoint() { * Returns the current local checkpoint getting refreshed internally. */ public final long currentOngoingRefreshCheckpoint() { - return lastRefreshedCheckpointListener.pendingCheckpoint; + return lastRefreshedCheckpointListener.pendingCheckpoint.get(); } private final Object refreshIfNeededMutex = new Object(); @@ -2968,29 +2968,33 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) { private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener { final AtomicLong refreshedCheckpoint; - volatile long pendingCheckpoint; + volatile AtomicLong pendingCheckpoint; LastRefreshedCheckpointListener(long initialLocalCheckpoint) { this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint); - this.pendingCheckpoint = initialLocalCheckpoint; + this.pendingCheckpoint = new AtomicLong(initialLocalCheckpoint); } @Override public void beforeRefresh() { // all changes until this point should be visible after refresh - pendingCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); + pendingCheckpoint.updateAndGet(curr -> Math.max(curr, localCheckpointTracker.getProcessedCheckpoint())); } @Override public void afterRefresh(boolean didRefresh) { if (didRefresh) { - updateRefreshedCheckpoint(pendingCheckpoint); + updateRefreshedCheckpoint(pendingCheckpoint.get()); } } void updateRefreshedCheckpoint(long checkpoint) { refreshedCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint)); assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint; + // This shouldn't be required ideally, but we're also invoking this method from refresh as of now. + // This change is added as safety check to ensure that our checkpoint values are consistent at all times. + pendingCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint)); + } }