Skip to content

Commit

Permalink
improve format based on feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Oct 7, 2020
1 parent ae7e3ed commit 64a95c0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -420,30 +420,43 @@ private long currentSegmentLastTime() {
}

private void getNextSegmentIterator() {
++currentSegmentId;
lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp);
if (forward) {
++currentSegmentId;
lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp);

if (currentSegmentId > lastSegmentId) {
current = null;
return;
}
if (currentSegmentId > lastSegmentId) {
current = null;
return;
}

setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());

current.close();

current.close();
if (forward) {
current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo);
} else {
--currentSegmentId;

if (currentSegmentId < lastSegmentId) {
current = null;
return;
}

setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());

current.close();

current = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo);
}

}

private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) {
if (cacheFunction.segmentId(lowerRangeEndTime) != cacheFunction.segmentId(upperRangeEndTime)) {
throw new IllegalStateException("Error iterating over segments: segment interval has changed");
}

if (keyFrom == keyTo) {
if (keyFrom.equals(keyTo)) {
cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime));
cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
return registerNewIterator(
key,
key,
Long.MAX_VALUE, endTimeMap.entrySet().iterator(),
Long.MAX_VALUE,
endTimeMap.entrySet().iterator(),
true);
}

Expand Down Expand Up @@ -484,13 +485,16 @@ private boolean setInnerIterators() {
recordIterator = entries.iterator();
} else {
final Set<Entry<Long, byte[]>> entries;
if (forward) entries = nextKeyEntry.getValue()
.headMap(latestSessionStartTime, true)
.descendingMap()
.entrySet();
else entries = nextKeyEntry.getValue()
.headMap(latestSessionStartTime, true)
.entrySet();
if (forward) {
entries = nextKeyEntry.getValue()
.headMap(latestSessionStartTime, true)
.descendingMap()
.entrySet();
} else {
entries = nextKeyEntry.getValue()
.headMap(latestSessionStartTime, true)
.entrySet();
}
recordIterator = entries.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public void shouldFetchExactKeys() {
@Test
public void shouldBackwardFetchExactKeys() {
sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long());
sessionStore.init(context, sessionStore);
sessionStore.init((StateStoreContext) context, sessionStore);

sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
Expand Down Expand Up @@ -473,7 +473,7 @@ public void shouldBackwardFetchAndIterateOverExactBinaryKeys() {
final SessionStore<Bytes, String> sessionStore =
buildSessionStore(RETENTION_PERIOD, Serdes.Bytes(), Serdes.String());

sessionStore.init(context, sessionStore);
sessionStore.init((StateStoreContext) context, sessionStore);

final Bytes key1 = Bytes.wrap(new byte[] {0});
final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
Expand Down

0 comments on commit 64a95c0

Please sign in to comment.