Skip to content

Commit

Permalink
replace backward with forward flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Aug 22, 2020
1 parent 2a428d3 commit 0871d19
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
final long to) {
return fetch(key, from, to, false);
return fetch(key, from, to, true);
}

@Override
public KeyValueIterator<Bytes, byte[]> backwardFetch(final Bytes key,
final long from,
final long to) {
return fetch(key, from, to, true);
return fetch(key, from, to, false);
}

KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
Expand All @@ -95,15 +95,15 @@ public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
final long from,
final long to) {
return fetch(keyFrom, keyTo, from, to, false);
return fetch(keyFrom, keyTo, from, to, true);
}

@Override
public KeyValueIterator<Bytes, byte[]> backwardFetch(final Bytes keyFrom,
final Bytes keyTo,
final long from,
final long to) {
return fetch(keyFrom, keyTo, from, to, true);
return fetch(keyFrom, keyTo, from, to, false);
}

KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
Expand Down Expand Up @@ -134,7 +134,7 @@ KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,

@Override
public KeyValueIterator<Bytes, byte[]> all() {
final List<S> searchSpace = segments.allSegments(false);
final List<S> searchSpace = segments.allSegments(true);

return new SegmentIterator<>(
searchSpace.iterator(),
Expand All @@ -146,7 +146,7 @@ public KeyValueIterator<Bytes, byte[]> all() {

@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
final List<S> searchSpace = segments.allSegments(true);
final List<S> searchSpace = segments.allSegments(false);

return new SegmentIterator<>(
searchSpace.iterator(),
Expand All @@ -159,7 +159,7 @@ public KeyValueIterator<Bytes, byte[]> backwardAll() {
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
final List<S> searchSpace = segments.segments(timeFrom, timeTo, false);
final List<S> searchSpace = segments.segments(timeFrom, timeTo, true);

return new SegmentIterator<>(
searchSpace.iterator(),
Expand All @@ -172,7 +172,7 @@ public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
final long timeTo) {
final List<S> searchSpace = segments.segments(timeFrom, timeTo, true);
final List<S> searchSpace = segments.segments(timeFrom, timeTo, false);

return new SegmentIterator<>(
searchSpace.iterator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,19 @@ public void openExisting(final ProcessorContext context, final long streamTime)
}

@Override
public List<S> segments(final long timeFrom, final long timeTo, final boolean backward) {
public List<S> segments(final long timeFrom, final long timeTo, final boolean forward) {
final List<S> result = new ArrayList<>();
final NavigableMap<Long, S> segmentsInRange;
if (backward) {
if (forward) {
segmentsInRange = segments.subMap(
segmentId(timeFrom), true,
segmentId(timeTo), true
).descendingMap();
);
} else {
segmentsInRange = segments.subMap(
segmentId(timeFrom), true,
segmentId(timeTo), true
);
).descendingMap();
}
for (final S segment : segmentsInRange.values()) {
if (segment.isOpen()) {
Expand All @@ -141,13 +141,13 @@ public List<S> segments(final long timeFrom, final long timeTo, final boolean ba
}

@Override
public List<S> allSegments(final boolean backward) {
public List<S> allSegments(final boolean forward) {
final List<S> result = new ArrayList<>();
final Collection<S> values;
if (backward) {
values = segments.descendingMap().values();
} else {
if (forward) {
values = segments.values();
} else {
values = segments.descendingMap().values();
}
for (final S segment : values) {
if (segment.isOpen()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key,
cacheIterator, hasNextCondition, cacheFunction
);

return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, false);
return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, true);
}

@Override
Expand Down Expand Up @@ -246,7 +246,7 @@ public synchronized WindowStoreIterator<byte[]> backwardFetch(final Bytes key,
final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator =
new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction);

return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, true);
return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, false);
}

@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
Expand Down Expand Up @@ -290,7 +290,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
bytesSerdes,
windowSize,
cacheFunction,
false
true
);
}

Expand Down Expand Up @@ -336,7 +336,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from,
bytesSerdes,
windowSize,
cacheFunction,
true
false
);
}

Expand All @@ -358,7 +358,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
bytesSerdes,
windowSize,
cacheFunction,
false
true
);
}

Expand All @@ -382,7 +382,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final Instant
bytesSerdes,
windowSize,
cacheFunction,
true
false
);
}

Expand All @@ -399,7 +399,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
bytesSerdes,
windowSize,
cacheFunction,
false
true
);
}

Expand All @@ -416,7 +416,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
bytesSerdes,
windowSize,
cacheFunction,
true
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes

@Override
public byte[] fetch(final Bytes key, final long windowStartTimestamp) {

Objects.requireNonNull(key, "key cannot be null");

removeExpiredSegments();
Expand All @@ -165,17 +164,17 @@ public byte[] fetch(final Bytes key, final long windowStartTimestamp) {
@Deprecated
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
return fetch(key, timeFrom, timeTo, false);
return fetch(key, timeFrom, timeTo, true);
}

@Override
public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final Instant from, final Instant to) {
final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
return fetch(key, timeFrom, timeTo, true);
return fetch(key, timeFrom, timeTo, false);
}

WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo, final boolean backward) {
WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo, final boolean forward) {
Objects.requireNonNull(key, "key cannot be null");

removeExpiredSegments();
Expand All @@ -187,17 +186,17 @@ WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final lo
return WrappedInMemoryWindowStoreIterator.emptyIterator();
}

if (backward) {
if (forward) {
return registerNewWindowStoreIterator(
key,
segmentMap.subMap(minTime, true, timeTo, true)
.descendingMap().entrySet().iterator()
.entrySet().iterator()
);
} else {
return registerNewWindowStoreIterator(
key,
segmentMap.subMap(minTime, true, timeTo, true)
.entrySet().iterator()
.descendingMap().entrySet().iterator()
);
}
}
Expand All @@ -208,7 +207,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
final long timeFrom,
final long timeTo) {
return fetch(from, to, timeFrom, timeTo, false);
return fetch(from, to, timeFrom, timeTo, true);
}

@Override
Expand All @@ -218,14 +217,14 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes from,
final Instant toTime) {
final long timeFrom = ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime"));
final long timeTo = ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime"));
return fetch(from, to, timeFrom, timeTo, true);
return fetch(from, to, timeFrom, timeTo, false);
}

KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
final long timeFrom,
final long timeTo,
final boolean backward) {
final boolean forward) {
Objects.requireNonNull(from, "from key cannot be null");
Objects.requireNonNull(to, "to key cannot be null");

Expand All @@ -246,36 +245,36 @@ KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
return KeyValueIterators.emptyIterator();
}

if (backward) {
if (forward) {
return registerNewWindowedKeyValueIterator(
from,
to,
segmentMap.subMap(minTime, true, timeTo, true)
.descendingMap().entrySet().iterator());
.entrySet().iterator()
);
} else {
return registerNewWindowedKeyValueIterator(
from,
to,
segmentMap.subMap(minTime, true, timeTo, true)
.entrySet().iterator()
);
.descendingMap().entrySet().iterator());
}
}

@Deprecated
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
return fetchAll(timeFrom, timeTo, false);
return fetchAll(timeFrom, timeTo, true);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final Instant from, final Instant to) {
final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from"));
final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to"));
return fetchAll(timeFrom, timeTo, true);
return fetchAll(timeFrom, timeTo, false);
}

KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo, final boolean backward) {
KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo, final boolean forward) {
removeExpiredSegments();

// add one b/c records expire exactly retentionPeriod ms after created
Expand All @@ -285,18 +284,18 @@ KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final lo
return KeyValueIterators.emptyIterator();
}

if (backward) {
if (forward) {
return registerNewWindowedKeyValueIterator(
null,
null,
segmentMap.subMap(minTime, true, timeTo, true)
.descendingMap().entrySet().iterator());
.entrySet().iterator());
} else {
return registerNewWindowedKeyValueIterator(
null,
null,
segmentMap.subMap(minTime, true, timeTo, true)
.entrySet().iterator());
.descendingMap().entrySet().iterator());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,6 @@ interface KeySchema {
* @param to
* @return List of segments to search
*/
<S extends Segment> List<S> segmentsToSearch(Segments<S> segments, long from, long to, boolean backward);
<S extends Segment> List<S> segmentsToSearch(Segments<S> segments, long from, long to, boolean forward);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ interface Segments<S extends Segment> {

void openExisting(final ProcessorContext context, final long streamTime);

List<S> segments(final long timeFrom, final long timeTo, final boolean backward);
List<S> segments(final long timeFrom, final long timeTo, final boolean forward);

List<S> allSegments(final boolean backward);
List<S> allSegments(final boolean forward);

void flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes
public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments,
final long from,
final long to,
final boolean backward) {
return segments.segments(from, Long.MAX_VALUE, backward);
final boolean forward) {
return segments.segments(from, Long.MAX_VALUE, forward);
}

private static <K> K extractKey(final byte[] binaryKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom,
public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments,
final long from,
final long to,
final boolean backward) {
return segments.segments(from, to, backward);
final boolean forward) {
return segments.segments(from, to, forward);
}

/**
Expand Down
Loading

0 comments on commit 0871d19

Please sign in to comment.