Skip to content

Commit

Permalink
Remove hppc from translog classes
Browse files Browse the repository at this point in the history
The translog writer uses hppc maps for mapping sequence numbers to
bitsets, and keeping track of which sequence numbers have been flushed
or not. This commit converts these uses to Java collections.

relates elastic#84735
  • Loading branch information
rjernst committed Apr 21, 2022
1 parent a97bd33 commit 017268c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

package org.elasticsearch.index.translog;

import com.carrotsearch.hppc.LongObjectHashMap;

import org.elasticsearch.index.seqno.CountedBitSet;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
* A snapshot composed out of multiple snapshots
Expand Down Expand Up @@ -75,19 +75,15 @@ public void close() throws IOException {

static final class SeqNoSet {
static final short BIT_SET_SIZE = 1024;
private final LongObjectHashMap<CountedBitSet> bitSets = new LongObjectHashMap<>();
private final Map<Long, CountedBitSet> bitSets = new HashMap<>();

/**
* Marks this sequence number and returns {@code true} if it is seen before.
*/
boolean getAndSet(long value) {
assert value >= 0;
final long key = value / BIT_SET_SIZE;
CountedBitSet bitset = bitSets.get(key);
if (bitset == null) {
bitset = new CountedBitSet(BIT_SET_SIZE);
bitSets.put(key, bitset);
}
CountedBitSet bitset = bitSets.computeIfAbsent(key, k -> new CountedBitSet(BIT_SET_SIZE));
final int index = Math.toIntExact(value % BIT_SET_SIZE);
final boolean wasOn = bitset.get(index);
bitset.set(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.elasticsearch.index.translog;

import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.procedures.LongProcedure;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
Expand All @@ -37,7 +34,9 @@
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -75,7 +74,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
// lock order synchronized(syncLock) -> try(Releasable lock = writeLock.acquire()) -> synchronized(this)
private final Object syncLock = new Object();

private LongArrayList nonFsyncedSequenceNumbers = new LongArrayList(64);
private List<Long> nonFsyncedSequenceNumbers = new ArrayList<>(64);
private final int forceWriteThreshold;
private volatile long bufferedBytes;
private ReleasableBytesStreamOutput buffer;
Expand Down Expand Up @@ -456,7 +455,7 @@ final boolean syncUpTo(long offset) throws IOException {
// double checked locking - we don't want to fsync unless we have to and now that we have
// the lock we should check again since if this code is busy we might have fsynced enough already
final Checkpoint checkpointToSync;
final LongArrayList flushedSequenceNumbers;
final List<Long> flushedSequenceNumbers;
final ReleasableBytesReference toWrite;
try (ReleasableLock toClose = writeLock.acquire()) {
synchronized (this) {
Expand All @@ -467,7 +466,7 @@ final boolean syncUpTo(long offset) throws IOException {
flushedSequenceNumbers = null;
} else {
flushedSequenceNumbers = nonFsyncedSequenceNumbers;
nonFsyncedSequenceNumbers = new LongArrayList(64);
nonFsyncedSequenceNumbers = new ArrayList<>(64);
}
}

Expand All @@ -493,7 +492,7 @@ final boolean syncUpTo(long offset) throws IOException {
throw ex;
}
if (flushedSequenceNumbers != null) {
flushedSequenceNumbers.forEach((LongProcedure) persistedSequenceNumberConsumer::accept);
flushedSequenceNumbers.forEach(persistedSequenceNumberConsumer::accept);
}
assert lastSyncedCheckpoint.offset <= checkpointToSync.offset
: "illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
Expand Down

0 comments on commit 017268c

Please sign in to comment.