Skip to content

Commit

Permalink
replace backward flag to forward flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Aug 20, 2020
1 parent a8bdfd4 commit b124cb7
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements KeyValueIterator<K, V> {
private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
private final KeyValueIterator<KS, VS> storeIterator;
private final boolean reverse;
private final boolean forward;

AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<KS, VS> storeIterator,
final boolean reverse) {
final boolean forward) {
this.cacheIterator = cacheIterator;
this.storeIterator = storeIterator;
this.reverse = reverse;
this.forward = forward;
}

abstract int compare(final Bytes cacheKey, final KS storeKey);
Expand Down Expand Up @@ -96,7 +96,7 @@ public KeyValue<K, V> next() {
private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
if (!reverse) {
if (forward) {
if (comparison > 0) {
return nextStoreValue(nextStoreKey);
} else if (comparison < 0) {
Expand Down Expand Up @@ -163,7 +163,7 @@ public K peekNextKey() {
private K chooseNextKey(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
if (!reverse) {
if (forward) {
if (comparison > 0) {
return deserializeStoreKey(nextStoreKey);
} else if (comparison < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().range(from, to);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}

@Override
Expand All @@ -264,7 +264,7 @@ public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().reverseRange(from, to);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseRange(cacheName, from, to);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false);
}

@Override
Expand All @@ -273,7 +273,7 @@ public KeyValueIterator<Bytes, byte[]> all() {
final KeyValueIterator<Bytes, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().all());
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().all(cacheName);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}

@Override
Expand All @@ -282,7 +282,7 @@ public KeyValueIterator<Bytes, byte[]> reverseAll() {
final KeyValueIterator<Bytes, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().reverseAll());
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseAll(cacheName);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ public synchronized byte[] delete(final Bytes key) {

@Override
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
return range(from, to, false);
return range(from, to, true);
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, final Bytes to) {
return range(from, to, true);
return range(from, to, false);
}

private KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to, final boolean reverse) {
private KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to, final boolean forward) {
if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
Expand All @@ -129,21 +129,21 @@ private KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to,

return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet(), reverse));
new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet(), forward));
}

@Override
public synchronized KeyValueIterator<Bytes, byte[]> all() {
return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.keySet(), false));
new InMemoryKeyValueIterator(map.keySet(), true));
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.keySet(), true));
new InMemoryKeyValueIterator(map.keySet(), false));
}

@Override
Expand All @@ -166,11 +166,11 @@ public void close() {
private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
private final Iterator<Bytes> iter;

private InMemoryKeyValueIterator(final Set<Bytes> keySet, final boolean reverse) {
if (reverse) {
this.iter = new TreeSet<>(keySet).descendingIterator();
} else {
private InMemoryKeyValueIterator(final Set<Bytes> keySet, final boolean forward) {
if (forward) {
this.iter = new TreeSet<>(keySet).iterator();
} else {
this.iter = new TreeSet<>(keySet).descendingIterator();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class MergedSortedCacheKeyValueBytesStoreIterator

MergedSortedCacheKeyValueBytesStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<Bytes, byte[]> storeIterator,
final boolean reverse) {
super(cacheIterator, storeIterator, reverse);
final boolean forward) {
super(cacheIterator, storeIterator, forward);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto
MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator,
final SegmentedCacheFunction cacheFunction) {
super(cacheIterator, storeIterator, false);
super(cacheIterator, storeIterator, true);
this.cacheFunction = cacheFunction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class MergedSortedCacheWindowStoreIterator extends AbstractMergedSortedCacheStor

MergedSortedCacheWindowStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<Long, byte[]> storeIterator) {
super(cacheIterator, storeIterator, false);
super(cacheIterator, storeIterator, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator
final long windowSize,
final SegmentedCacheFunction cacheFunction
) {
super(filteredCacheIterator, underlyingIterator, false);
super(filteredCacheIterator, underlyingIterator, true);
this.serdes = serdes;
this.windowSize = windowSize;
this.cacheFunction = cacheFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,27 +281,27 @@ public boolean isEmpty() {
}

synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), false);
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true);
}

synchronized Iterator<Bytes> reverseKeyRange(final Bytes from, final Bytes to) {
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true);
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), false);
}

private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet, final boolean reverse) {
if (reverse) {
return new TreeSet<>(keySet).descendingIterator();
} else {
private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet, final boolean forward) {
if (forward) {
return new TreeSet<>(keySet).iterator();
} else {
return new TreeSet<>(keySet).descendingIterator();
}
}

synchronized Iterator<Bytes> allKeys() {
return keySetIterator(cache.navigableKeySet(), false);
return keySetIterator(cache.navigableKeySet(), true);
}

synchronized Iterator<Bytes> reverseAllKeys() {
return keySetIterator(cache.navigableKeySet(), true);
return keySetIterator(cache.navigableKeySet(), false);
}

synchronized LRUCacheEntry first() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,28 @@ class RocksDBRangeIterator extends RocksDbIterator {
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final byte[] rawLastKey;
private final boolean reverse;
private final boolean forward;

RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
final Bytes from,
final Bytes to,
final boolean reverse) {
super(storeName, iter, openIterators, reverse);
this.reverse = reverse;
if (reverse) {
iter.seekForPrev(to.get());
rawLastKey = from.get();
if (rawLastKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawLastKey is null for key " + from);
}
} else {
final boolean forward) {
super(storeName, iter, openIterators, forward);
this.forward = forward;
if (forward) {
iter.seek(from.get());
rawLastKey = to.get();
if (rawLastKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawLastKey is null for key " + to);
}
} else {
iter.seekForPrev(to.get());
rawLastKey = from.get();
if (rawLastKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawLastKey is null for key " + from);
}
}
}

Expand All @@ -61,7 +61,7 @@ public KeyValue<Bytes, byte[]> makeNext() {
if (next == null) {
return allDone();
} else {
if (!reverse) {
if (forward) {
if (comparator.compare(next.key.get(), rawLastKey) <= 0) {
return next;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,18 +324,18 @@ public synchronized byte[] delete(final Bytes key) {
@Override
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
return range(from, to, false);
return range(from, to, true);
}

@Override
public synchronized KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
return range(from, to, true);
return range(from, to, false);
}

KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to,
final boolean reverse) {
final boolean forward) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");

Expand All @@ -348,25 +348,25 @@ KeyValueIterator<Bytes, byte[]> range(final Bytes from,

validateStoreOpen();

final KeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from, to, reverse);
final KeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from, to, forward);
openIterators.add(rocksDBRangeIterator);

return rocksDBRangeIterator;
}

@Override
public synchronized KeyValueIterator<Bytes, byte[]> all() {
return all(false);
return all(true);
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
return all(true);
return all(false);
}

KeyValueIterator<Bytes, byte[]> all(final boolean reverse) {
KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> rocksDbIterator = dbAccessor.all(reverse);
final KeyValueIterator<Bytes, byte[]> rocksDbIterator = dbAccessor.all(forward);
openIterators.add(rocksDbIterator);
return rocksDbIterator;
}
Expand Down Expand Up @@ -493,9 +493,9 @@ void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,

KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to,
final boolean reverse);
final boolean forward);

KeyValueIterator<Bytes, byte[]> all(final boolean reverse);
KeyValueIterator<Bytes, byte[]> all(final boolean forward);

long approximateNumEntries() throws RocksDBException;

Expand Down Expand Up @@ -560,25 +560,25 @@ public byte[] getOnly(final byte[] key) throws RocksDBException {
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to,
final boolean reverse) {
final boolean forward) {
return new RocksDBRangeIterator(
name,
db.newIterator(columnFamily),
openIterators,
from,
to,
reverse);
forward);
}

@Override
public KeyValueIterator<Bytes, byte[]> all(final boolean reverse) {
public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily);
if (reverse) {
innerIterWithTimestamp.seekToLast();
} else {
if (forward) {
innerIterWithTimestamp.seekToFirst();
} else {
innerIterWithTimestamp.seekToLast();
}
return new RocksDbIterator(name, innerIterWithTimestamp, openIterators, reverse);
return new RocksDbIterator(name, innerIterWithTimestamp, openIterators, forward);
}

@Override
Expand Down
Loading

0 comments on commit b124cb7

Please sign in to comment.