diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 9fad96940b87b..87895ca7fe4ad 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -158,6 +158,25 @@ public synchronized void waitForOpsToComplete(final long seqNo) throws Interrupt } } + /** + * Checks if the given sequence number was marked as completed in this tracker. + */ + public boolean contains(final long seqNo) { + assert seqNo >= 0 : "invalid seq_no=" + seqNo; + if (seqNo >= nextSeqNo) { + return false; + } + if (seqNo <= checkpoint) { + return true; + } + final long bitSetKey = getBitSetKey(seqNo); + final CountedBitSet bitSet; + synchronized (this) { + bitSet = processedSeqNo.get(bitSetKey); + } + return bitSet != null && bitSet.get(seqNoToBitSetOffset(seqNo)); + } + /** * Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the * current checkpoint is processed. @@ -206,7 +225,6 @@ private long lastSeqNoInBitSet(final long bitSetKey) { * @return the bit set corresponding to the provided sequence number */ private long getBitSetKey(final long seqNo) { - assert Thread.holdsLock(this); return seqNo / BIT_SET_SIZE; } @@ -232,7 +250,6 @@ private CountedBitSet getBitSetForSeqNo(final long seqNo) { * @return the position in the bit set corresponding to the provided sequence number */ private int seqNoToBitSetOffset(final long seqNo) { - assert Thread.holdsLock(this); return Math.toIntExact(seqNo % BIT_SET_SIZE); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 932fb71790800..aef31a16110a8 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -65,24 +65,36 @@ public void testSimplePrimary() { assertThat(seqNo1, equalTo(0L)); tracker.markSeqNoAsCompleted(seqNo1); assertThat(tracker.getCheckpoint(), equalTo(0L)); + assertThat(tracker.contains(0L), equalTo(true)); + assertThat(tracker.contains(atLeast(1)), equalTo(false)); seqNo1 = tracker.generateSeqNo(); seqNo2 = tracker.generateSeqNo(); assertThat(seqNo1, equalTo(1L)); assertThat(seqNo2, equalTo(2L)); tracker.markSeqNoAsCompleted(seqNo2); assertThat(tracker.getCheckpoint(), equalTo(0L)); + assertThat(tracker.contains(seqNo1), equalTo(false)); + assertThat(tracker.contains(seqNo2), equalTo(true)); tracker.markSeqNoAsCompleted(seqNo1); assertThat(tracker.getCheckpoint(), equalTo(2L)); + assertThat(tracker.contains(between(0, 2)), equalTo(true)); + assertThat(tracker.contains(atLeast(3)), equalTo(false)); } public void testSimpleReplica() { assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + assertThat(tracker.contains(randomNonNegativeLong()), equalTo(false)); tracker.markSeqNoAsCompleted(0L); assertThat(tracker.getCheckpoint(), equalTo(0L)); + assertThat(tracker.contains(0), equalTo(true)); tracker.markSeqNoAsCompleted(2L); assertThat(tracker.getCheckpoint(), equalTo(0L)); + assertThat(tracker.contains(1L), equalTo(false)); + assertThat(tracker.contains(2L), equalTo(true)); tracker.markSeqNoAsCompleted(1L); assertThat(tracker.getCheckpoint(), equalTo(2L)); + assertThat(tracker.contains(between(0, 2)), equalTo(true)); + assertThat(tracker.contains(atLeast(3)), equalTo(false)); } public void testLazyInitialization() { @@ -90,20 +102,24 @@ public void testLazyInitialization() { * Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large * sequence numbers this could lead to excessive memory usage resulting in out of memory errors. */ - tracker.markSeqNoAsCompleted(randomNonNegativeLong()); + long seqNo = randomNonNegativeLong(); + tracker.markSeqNoAsCompleted(seqNo); + assertThat(tracker.processedSeqNo.size(), equalTo(1)); + assertThat(tracker.contains(seqNo), equalTo(true)); + assertThat(tracker.contains(randomValueOtherThan(seqNo, ESTestCase::randomNonNegativeLong)), equalTo(false)); assertThat(tracker.processedSeqNo.size(), equalTo(1)); } public void testSimpleOverFlow() { - List seqNoList = new ArrayList<>(); + List seqNoList = new ArrayList<>(); final boolean aligned = randomBoolean(); final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1)); - for (int i = 0; i < maxOps; i++) { + for (long i = 0; i < maxOps; i++) { seqNoList.add(i); } Collections.shuffle(seqNoList, random()); - for (Integer seqNo : seqNoList) { + for (Long seqNo : seqNoList) { tracker.markSeqNoAsCompleted(seqNo); } assertThat(tracker.checkpoint, equalTo(maxOps - 1L)); @@ -111,6 +127,9 @@ public void testSimpleOverFlow() { if (aligned == false) { assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE)); } + assertThat(tracker.contains(randomFrom(seqNoList)), equalTo(true)); + final long notCompletedSeqNo = randomValueOtherThanMany(seqNoList::contains, ESTestCase::randomNonNegativeLong); + assertThat(tracker.contains(notCompletedSeqNo), equalTo(false)); } public void testConcurrentPrimary() throws InterruptedException { @@ -199,8 +218,12 @@ protected void doRun() throws Exception { } assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L)); assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); + assertThat(tracker.contains(randomValueOtherThan(unFinishedSeq, () -> (long) randomFrom(seqNos))), equalTo(true)); + assertThat(tracker.contains(unFinishedSeq), equalTo(false)); tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); + assertThat(tracker.contains(unFinishedSeq), equalTo(true)); + assertThat(tracker.contains(randomLongBetween(maxOps, Long.MAX_VALUE)), equalTo(false)); assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); if (tracker.processedSeqNo.size() == 1) { assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE)); @@ -272,4 +295,23 @@ public void describeTo(Description description) { }); assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1))); } + + public void testContains() { + final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100); + final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint); + if (localCheckpoint >= 0) { + assertThat(tracker.contains(randomLongBetween(0, localCheckpoint)), equalTo(true)); + } + assertThat(tracker.contains(randomLongBetween(localCheckpoint + 1, Long.MAX_VALUE)), equalTo(false)); + final int numOps = between(1, 100); + final List seqNos = new ArrayList<>(); + for (int i = 0; i < numOps; i++) { + long seqNo = randomLongBetween(0, 1000); + seqNos.add(seqNo); + tracker.markSeqNoAsCompleted(seqNo); + } + final long seqNo = randomNonNegativeLong(); + assertThat(tracker.contains(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo))); + } }