Skip to content

Commit

Permalink
Remove hppc from LocalCheckpointTracker (#86073)
Browse files Browse the repository at this point in the history
The LocalCheckpointTracker keeps mappings between sequence number and a
bitsets, using hppc primitive maps. This commit converts these to use
standard HashMaps.

relates #84735
  • Loading branch information
rjernst authored Apr 25, 2022
1 parent 108a54f commit e21303d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -28,13 +28,13 @@ public class LocalCheckpointTracker {
* A collection of bit sets representing processed sequence numbers. Each sequence number is mapped to a bit set by dividing by the
* bit set size.
*/
final LongObjectHashMap<CountedBitSet> processedSeqNo = new LongObjectHashMap<>();
final Map<Long, CountedBitSet> processedSeqNo = new HashMap<>();

/**
* A collection of bit sets representing durably persisted sequence numbers. Each sequence number is mapped to a bit set by dividing by
* the bit set size.
*/
final LongObjectHashMap<CountedBitSet> persistedSeqNo = new LongObjectHashMap<>();
final Map<Long, CountedBitSet> persistedSeqNo = new HashMap<>();

/**
* The current local checkpoint, i.e., all sequence numbers no more than this number have been processed.
Expand Down Expand Up @@ -109,7 +109,7 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) {
markSeqNo(seqNo, persistedCheckpoint, persistedSeqNo);
}

private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap<CountedBitSet> bitSetMap) {
private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final Map<Long, CountedBitSet> bitSetMap) {
assert Thread.holdsLock(this);
// make sure we track highest seen sequence number
advanceMaxSeqNo(seqNo);
Expand Down Expand Up @@ -188,7 +188,7 @@ public boolean hasProcessed(final long seqNo) {
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number
* following the current checkpoint is processed.
*/
private void updateCheckpoint(AtomicLong checkPoint, LongObjectHashMap<CountedBitSet> bitSetMap) {
private void updateCheckpoint(AtomicLong checkPoint, Map<Long, CountedBitSet> bitSetMap) {
assert Thread.holdsLock(this);
assert getBitSetForSeqNo(bitSetMap, checkPoint.get() + 1).get(seqNoToBitSetOffset(checkPoint.get() + 1))
: "updateCheckpoint is called but the bit following the checkpoint is not set";
Expand Down Expand Up @@ -229,23 +229,15 @@ private static long getBitSetKey(final long seqNo) {
return seqNo / BIT_SET_SIZE;
}

private CountedBitSet getBitSetForSeqNo(final LongObjectHashMap<CountedBitSet> bitSetMap, final long seqNo) {
private CountedBitSet getBitSetForSeqNo(final Map<Long, CountedBitSet> bitSetMap, final long seqNo) {
assert Thread.holdsLock(this);
final long bitSetKey = getBitSetKey(seqNo);
final int index = bitSetMap.indexOf(bitSetKey);
final CountedBitSet bitSet;
if (bitSetMap.indexExists(index)) {
bitSet = bitSetMap.indexGet(index);
} else {
bitSet = new CountedBitSet(BIT_SET_SIZE);
bitSetMap.indexInsert(index, bitSetKey, bitSet);
}
return bitSet;
return bitSetMap.computeIfAbsent(bitSetKey, k -> new CountedBitSet(BIT_SET_SIZE));
}

/**
* Obtain the position in the bit set corresponding to the provided sequence number. The bit set corresponding to the sequence number
* can be obtained via {@link #getBitSetForSeqNo(LongObjectHashMap, long)}.
* can be obtained via {@link #getBitSetForSeqNo(Map, long)}.
*
* @param seqNo the sequence number to obtain the position for
* @return the position in the bit set corresponding to the provided sequence number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testSimpleOverFlow() {
assertThat(tracker.processedCheckpoint.get(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
if (aligned == false) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.processedCheckpoint.get() / BIT_SET_SIZE));
assertThat(tracker.processedSeqNo.keySet().iterator().next(), equalTo(tracker.processedCheckpoint.get() / BIT_SET_SIZE));
}
assertThat(tracker.hasProcessed(randomFrom(seqNoList)), equalTo(true));
final long notCompletedSeqNo = randomValueOtherThanMany(seqNoList::contains, ESTestCase::randomNonNegativeLong);
Expand Down Expand Up @@ -191,7 +191,7 @@ protected void doRun() throws Exception {
assertThat(tracker.getProcessedCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), is(oneOf(0, 1)));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.processedCheckpoint.get() / BIT_SET_SIZE));
assertThat(tracker.processedSeqNo.keySet().iterator().next(), equalTo(tracker.processedCheckpoint.get() / BIT_SET_SIZE));
}
}

Expand Down Expand Up @@ -245,7 +245,7 @@ protected void doRun() throws Exception {
assertThat(tracker.hasProcessed(randomLongBetween(maxOps, Long.MAX_VALUE)), equalTo(false));
assertThat(tracker.processedSeqNo.size(), is(oneOf(0, 1)));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.processedCheckpoint.get() / BIT_SET_SIZE));
assertThat(tracker.processedSeqNo.keySet().iterator().next(), equalTo(tracker.processedCheckpoint.get() / BIT_SET_SIZE));
}
}

Expand Down

0 comments on commit e21303d

Please sign in to comment.