diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index c9bb97983ac79..31589aa03f533 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -420,22 +420,35 @@ private long currentSegmentLastTime() { } private void getNextSegmentIterator() { - ++currentSegmentId; - lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); + if (forward) { + ++currentSegmentId; + lastSegmentId = cacheFunction.segmentId(maxObservedTimestamp); - if (currentSegmentId > lastSegmentId) { - current = null; - return; - } + if (currentSegmentId > lastSegmentId) { + current = null; + return; + } - setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + + current.close(); - current.close(); - if (forward) { current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); } else { + --currentSegmentId; + + if (currentSegmentId < lastSegmentId) { + current = null; + return; + } + + setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); + + current.close(); + current = context.cache().reverseRange(cacheName, cacheKeyFrom, cacheKeyTo); } + } private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRangeEndTime) { @@ -443,7 +456,7 @@ private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRang throw new IllegalStateException("Error iterating over segments: segment interval has changed"); } - if (keyFrom == keyTo) { + if (keyFrom.equals(keyTo)) { cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index f7bea499d20a7..3aad2d1e0458d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -250,7 +250,8 @@ public KeyValueIterator, byte[]> fetch(final Bytes key) { return registerNewIterator( key, key, - Long.MAX_VALUE, endTimeMap.entrySet().iterator(), + Long.MAX_VALUE, + endTimeMap.entrySet().iterator(), true); } @@ -484,13 +485,16 @@ private boolean setInnerIterators() { recordIterator = entries.iterator(); } else { final Set> entries; - if (forward) entries = nextKeyEntry.getValue() - .headMap(latestSessionStartTime, true) - .descendingMap() - .entrySet(); - else entries = nextKeyEntry.getValue() - .headMap(latestSessionStartTime, true) - .entrySet(); + if (forward) { + entries = nextKeyEntry.getValue() + .headMap(latestSessionStartTime, true) + .descendingMap() + .entrySet(); + } else { + entries = nextKeyEntry.getValue() + .headMap(latestSessionStartTime, true) + .entrySet(); + } recordIterator = entries.iterator(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 795515e7de261..8130e02e66523 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -405,7 +405,7 @@ public void shouldFetchExactKeys() { @Test public void shouldBackwardFetchExactKeys() { sessionStore = buildSessionStore(0x7a00000000000000L, Serdes.String(), Serdes.Long()); - sessionStore.init(context, sessionStore); + sessionStore.init((StateStoreContext) context, sessionStore); sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); @@ -473,7 +473,7 @@ public void shouldBackwardFetchAndIterateOverExactBinaryKeys() { final SessionStore sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.Bytes(), Serdes.String()); - sessionStore.init(context, sessionStore); + sessionStore.init((StateStoreContext) context, sessionStore); final Bytes key1 = Bytes.wrap(new byte[] {0}); final Bytes key2 = Bytes.wrap(new byte[] {0, 0});