Skip to content

Commit

Permalink
Fix flaky test InternalEngineTests.testLastRefreshCheckpoint (#9365) (#…
Browse files Browse the repository at this point in the history
…9395)

(cherry picked from commit f971e5b)

Signed-off-by: Ankit Kala <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 9ee3754 commit 6ef4893
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2948,7 +2948,7 @@ public final long lastRefreshedCheckpoint() {
* Returns the current local checkpoint getting refreshed internally.
*/
public final long currentOngoingRefreshCheckpoint() {
return lastRefreshedCheckpointListener.pendingCheckpoint;
return lastRefreshedCheckpointListener.pendingCheckpoint.get();
}

private final Object refreshIfNeededMutex = new Object();
Expand All @@ -2968,29 +2968,33 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {

private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
volatile long pendingCheckpoint;
volatile AtomicLong pendingCheckpoint;

LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
this.pendingCheckpoint = initialLocalCheckpoint;
this.pendingCheckpoint = new AtomicLong(initialLocalCheckpoint);
}

@Override
public void beforeRefresh() {
// all changes until this point should be visible after refresh
pendingCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
pendingCheckpoint.updateAndGet(curr -> Math.max(curr, localCheckpointTracker.getProcessedCheckpoint()));
}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
updateRefreshedCheckpoint(pendingCheckpoint);
updateRefreshedCheckpoint(pendingCheckpoint.get());
}
}

void updateRefreshedCheckpoint(long checkpoint) {
refreshedCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
// This shouldn't be required ideally, but we're also invoking this method from refresh as of now.
// This change is added as safety check to ensure that our checkpoint values are consistent at all times.
pendingCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));

}
}

Expand Down

0 comments on commit 6ef4893

Please sign in to comment.