diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 60d0c65f253b3..b740bd93d4ca9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -63,14 +63,14 @@ public class AbstractRocksDBSegmentedBytesStore implements Se public KeyValueIterator fetch(final Bytes key, final long from, final long to) { - return fetch(key, from, to, false); + return fetch(key, from, to, true); } @Override public KeyValueIterator backwardFetch(final Bytes key, final long from, final long to) { - return fetch(key, from, to, true); + return fetch(key, from, to, false); } KeyValueIterator fetch(final Bytes key, @@ -95,7 +95,7 @@ public KeyValueIterator fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { - return fetch(keyFrom, keyTo, from, to, false); + return fetch(keyFrom, keyTo, from, to, true); } @Override @@ -103,7 +103,7 @@ public KeyValueIterator backwardFetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { - return fetch(keyFrom, keyTo, from, to, true); + return fetch(keyFrom, keyTo, from, to, false); } KeyValueIterator fetch(final Bytes keyFrom, @@ -134,7 +134,7 @@ KeyValueIterator fetch(final Bytes keyFrom, @Override public KeyValueIterator all() { - final List searchSpace = segments.allSegments(false); + final List searchSpace = segments.allSegments(true); return new SegmentIterator<>( searchSpace.iterator(), @@ -146,7 +146,7 @@ public KeyValueIterator all() { @Override public KeyValueIterator backwardAll() { - final List searchSpace = segments.allSegments(true); + final List searchSpace = segments.allSegments(false); return new SegmentIterator<>( searchSpace.iterator(), @@ -159,7 +159,7 @@ public KeyValueIterator backwardAll() { @Override public KeyValueIterator fetchAll(final long timeFrom, final long timeTo) { - final List searchSpace = segments.segments(timeFrom, timeTo, false); + final List searchSpace = segments.segments(timeFrom, timeTo, true); return new SegmentIterator<>( searchSpace.iterator(), @@ -172,7 +172,7 @@ public KeyValueIterator fetchAll(final long timeFrom, @Override public KeyValueIterator backwardFetchAll(final long timeFrom, final long timeTo) { - final List searchSpace = segments.segments(timeFrom, timeTo, true); + final List searchSpace = segments.segments(timeFrom, timeTo, false); return new SegmentIterator<>( searchSpace.iterator(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 1b29c6f13ae43..4b59c95ff2c91 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -118,19 +118,19 @@ public void openExisting(final ProcessorContext context, final long streamTime) } @Override - public List segments(final long timeFrom, final long timeTo, final boolean backward) { + public List segments(final long timeFrom, final long timeTo, final boolean forward) { final List result = new ArrayList<>(); final NavigableMap segmentsInRange; - if (backward) { + if (forward) { segmentsInRange = segments.subMap( segmentId(timeFrom), true, segmentId(timeTo), true - ).descendingMap(); + ); } else { segmentsInRange = segments.subMap( segmentId(timeFrom), true, segmentId(timeTo), true - ); + ).descendingMap(); } for (final S segment : segmentsInRange.values()) { if (segment.isOpen()) { @@ -141,13 +141,13 @@ public List segments(final long timeFrom, final long timeTo, final boolean ba } @Override - public List allSegments(final boolean backward) { + public List allSegments(final boolean forward) { final List result = new ArrayList<>(); final Collection values; - if (backward) { - values = segments.descendingMap().values(); - } else { + if (forward) { values = segments.values(); + } else { + values = segments.descendingMap().values(); } for (final S segment : values) { if (segment.isOpen()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index f2756b6f22124..cfcd15f3c007d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -216,7 +216,7 @@ public synchronized WindowStoreIterator fetch(final Bytes key, cacheIterator, hasNextCondition, cacheFunction ); - return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, false); + return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, true); } @Override @@ -246,7 +246,7 @@ public synchronized WindowStoreIterator backwardFetch(final Bytes key, final PeekingKeyValueIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); - return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, true); + return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator, false); } @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed @@ -290,7 +290,7 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, bytesSerdes, windowSize, cacheFunction, - false + true ); } @@ -336,7 +336,7 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes from, bytesSerdes, windowSize, cacheFunction, - true + false ); } @@ -358,7 +358,7 @@ public KeyValueIterator, byte[]> fetchAll(final long timeFrom, bytesSerdes, windowSize, cacheFunction, - false + true ); } @@ -382,7 +382,7 @@ public KeyValueIterator, byte[]> backwardFetchAll(final Instant bytesSerdes, windowSize, cacheFunction, - true + false ); } @@ -399,7 +399,7 @@ public KeyValueIterator, byte[]> all() { bytesSerdes, windowSize, cacheFunction, - false + true ); } @@ -416,7 +416,7 @@ public KeyValueIterator, byte[]> backwardAll() { bytesSerdes, windowSize, cacheFunction, - true + false ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index b3fb397bc466d..e614cd3eea3ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -145,7 +145,6 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes @Override public byte[] fetch(final Bytes key, final long windowStartTimestamp) { - Objects.requireNonNull(key, "key cannot be null"); removeExpiredSegments(); @@ -165,17 +164,17 @@ public byte[] fetch(final Bytes key, final long windowStartTimestamp) { @Deprecated @Override public WindowStoreIterator fetch(final Bytes key, final long timeFrom, final long timeTo) { - return fetch(key, timeFrom, timeTo, false); + return fetch(key, timeFrom, timeTo, true); } @Override public WindowStoreIterator backwardFetch(final Bytes key, final Instant from, final Instant to) { final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); - return fetch(key, timeFrom, timeTo, true); + return fetch(key, timeFrom, timeTo, false); } - WindowStoreIterator fetch(final Bytes key, final long timeFrom, final long timeTo, final boolean backward) { + WindowStoreIterator fetch(final Bytes key, final long timeFrom, final long timeTo, final boolean forward) { Objects.requireNonNull(key, "key cannot be null"); removeExpiredSegments(); @@ -187,17 +186,17 @@ WindowStoreIterator fetch(final Bytes key, final long timeFrom, final lo return WrappedInMemoryWindowStoreIterator.emptyIterator(); } - if (backward) { + if (forward) { return registerNewWindowStoreIterator( key, segmentMap.subMap(minTime, true, timeTo, true) - .descendingMap().entrySet().iterator() + .entrySet().iterator() ); } else { return registerNewWindowStoreIterator( key, segmentMap.subMap(minTime, true, timeTo, true) - .entrySet().iterator() + .descendingMap().entrySet().iterator() ); } } @@ -208,7 +207,7 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) { - return fetch(from, to, timeFrom, timeTo, false); + return fetch(from, to, timeFrom, timeTo, true); } @Override @@ -218,14 +217,14 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes from, final Instant toTime) { final long timeFrom = ApiUtils.validateMillisecondInstant(fromTime, prepareMillisCheckFailMsgPrefix(fromTime, "fromTime")); final long timeTo = ApiUtils.validateMillisecondInstant(toTime, prepareMillisCheckFailMsgPrefix(toTime, "toTime")); - return fetch(from, to, timeFrom, timeTo, true); + return fetch(from, to, timeFrom, timeTo, false); } KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo, - final boolean backward) { + final boolean forward) { Objects.requireNonNull(from, "from key cannot be null"); Objects.requireNonNull(to, "to key cannot be null"); @@ -246,36 +245,36 @@ KeyValueIterator, byte[]> fetch(final Bytes from, return KeyValueIterators.emptyIterator(); } - if (backward) { + if (forward) { return registerNewWindowedKeyValueIterator( from, to, segmentMap.subMap(minTime, true, timeTo, true) - .descendingMap().entrySet().iterator()); + .entrySet().iterator() + ); } else { return registerNewWindowedKeyValueIterator( from, to, segmentMap.subMap(minTime, true, timeTo, true) - .entrySet().iterator() - ); + .descendingMap().entrySet().iterator()); } } @Deprecated @Override public KeyValueIterator, byte[]> fetchAll(final long timeFrom, final long timeTo) { - return fetchAll(timeFrom, timeTo, false); + return fetchAll(timeFrom, timeTo, true); } @Override public KeyValueIterator, byte[]> backwardFetchAll(final Instant from, final Instant to) { final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); - return fetchAll(timeFrom, timeTo, true); + return fetchAll(timeFrom, timeTo, false); } - KeyValueIterator, byte[]> fetchAll(final long timeFrom, final long timeTo, final boolean backward) { + KeyValueIterator, byte[]> fetchAll(final long timeFrom, final long timeTo, final boolean forward) { removeExpiredSegments(); // add one b/c records expire exactly retentionPeriod ms after created @@ -285,18 +284,18 @@ KeyValueIterator, byte[]> fetchAll(final long timeFrom, final lo return KeyValueIterators.emptyIterator(); } - if (backward) { + if (forward) { return registerNewWindowedKeyValueIterator( null, null, segmentMap.subMap(minTime, true, timeTo, true) - .descendingMap().entrySet().iterator()); + .entrySet().iterator()); } else { return registerNewWindowedKeyValueIterator( null, null, segmentMap.subMap(minTime, true, timeTo, true) - .entrySet().iterator()); + .descendingMap().entrySet().iterator()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java index 07082e8ea3631..79ada1f07db05 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java @@ -202,6 +202,6 @@ interface KeySchema { * @param to * @return List of segments to search */ - List segmentsToSearch(Segments segments, long from, long to, boolean backward); + List segmentsToSearch(Segments segments, long from, long to, boolean forward); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 38a5d318bcf66..7e50b98452165 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -34,9 +34,9 @@ interface Segments { void openExisting(final ProcessorContext context, final long streamTime); - List segments(final long timeFrom, final long timeTo, final boolean backward); + List segments(final long timeFrom, final long timeTo, final boolean forward); - List allSegments(final boolean backward); + List allSegments(final boolean forward); void flush(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index 568297e0d7825..326b86945f8e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -88,8 +88,8 @@ public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes public List segmentsToSearch(final Segments segments, final long from, final long to, - final boolean backward) { - return segments.segments(from, Long.MAX_VALUE, backward); + final boolean forward) { + return segments.segments(from, Long.MAX_VALUE, forward); } private static K extractKey(final byte[] binaryKey, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index a675ae490f8c9..a467af8cf5a1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -94,8 +94,8 @@ public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, public List segmentsToSearch(final Segments segments, final long from, final long to, - final boolean backward) { - return segments.segments(from, to, backward); + final boolean forward) { + return segments.segments(from, to, forward); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java index 442723d0c7459..aeef8ce8e13f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java @@ -187,7 +187,7 @@ public void shouldGetSegmentsWithinTimeRange() { segments.getOrCreateSegmentIfLive(3, context, streamTime); segments.getOrCreateSegmentIfLive(4, context, streamTime); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, false); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, true); assertEquals(3, segments.size()); assertEquals(0, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -207,7 +207,7 @@ public void shouldGetSegmentsWithinBackwardTimeRange() { segments.getOrCreateSegmentIfLive(3, context, streamTime); segments.getOrCreateSegmentIfLive(4, context, streamTime); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, true); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, false); assertEquals(3, segments.size()); assertEquals(0, segments.get(2).id); assertEquals(1, segments.get(1).id); @@ -222,7 +222,7 @@ public void shouldGetSegmentsWithinTimeRangeOutOfOrder() { updateStreamTimeAndCreateSegment(1); updateStreamTimeAndCreateSegment(3); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, false); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, true); assertEquals(3, segments.size()); assertEquals(0, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -237,7 +237,7 @@ public void shouldGetSegmentsWithinTimeBackwardRangeOutOfOrder() { updateStreamTimeAndCreateSegment(1); updateStreamTimeAndCreateSegment(3); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, true); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, false); assertEquals(3, segments.size()); assertEquals(2, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -344,7 +344,7 @@ public void shouldClearSegmentsOnClose() { } private void verifyCorrectSegments(final long first, final int numSegments) { - final List result = this.segments.segments(0, Long.MAX_VALUE, false); + final List result = this.segments.segments(0, Long.MAX_VALUE, true); assertEquals(numSegments, result.size()); for (int i = 0; i < numSegments; i++) { assertEquals(i + first, result.get(i).id); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java index 7559b846badbf..0d69d933cf9f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java @@ -73,7 +73,7 @@ public void shouldIterateOverValueFromBothIterators() { ); final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator( - cacheIterator, storeIterator, false + cacheIterator, storeIterator, true ); int index = 0; while (iterator.hasNext()) { @@ -111,7 +111,7 @@ public void shouldReverseIterateOverValueFromBothIterators() { ); final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator( - cacheIterator, storeIterator, true + cacheIterator, storeIterator, false ); int index = 0; Collections.reverse(expectedKvPairs); @@ -135,7 +135,7 @@ public void shouldPeekNextStoreKey() { namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes) ); final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator( - cacheIterator, storeIterator, false + cacheIterator, storeIterator, true ); assertThat(iterator.peekNextKey(), equalTo(0L)); iterator.next(); @@ -156,7 +156,7 @@ public void shouldPeekNextStoreKeyReverse() { SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes) ); final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator( - cacheIterator, storeIterator, true + cacheIterator, storeIterator, false ); assertThat(iterator.peekNextKey(), equalTo(10L)); iterator.next(); @@ -170,9 +170,18 @@ public void shouldPeekNextCacheKey() { cache.put(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(WindowKeySchema.toStoreKeyBinary("a", 10L, 0, stateSerdes)), new LRUCacheEntry("b".getBytes())); final Bytes fromBytes = WindowKeySchema.toStoreKeyBinary("a", 0, 0, stateSerdes); final Bytes toBytes = WindowKeySchema.toStoreKeyBinary("a", 100, 0, stateSerdes); - final KeyValueIterator storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)); - final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator(cacheIterator, storeIterator, false); + final KeyValueIterator storeIterator = + new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range( + namespace, + SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), + SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes) + ); + final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator( + cacheIterator, + storeIterator, + true + ); assertThat(iterator.peekNextKey(), equalTo(0L)); iterator.next(); assertThat(iterator.peekNextKey(), equalTo(10L)); @@ -187,9 +196,16 @@ public void shouldPeekNextCacheKeyReverse() { final Bytes toBytes = WindowKeySchema.toStoreKeyBinary("a", 100, 0, stateSerdes); final KeyValueIterator storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(windowStoreKvPairs.iterator())); - final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = - cache.reverseRange(namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)); - final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator(cacheIterator, storeIterator, true); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.reverseRange( + namespace, + SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes), + SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes) + ); + final MergedSortedCacheWindowStoreIterator iterator = new MergedSortedCacheWindowStoreIterator( + cacheIterator, + storeIterator, + false + ); assertThat(iterator.peekNextKey(), equalTo(10L)); iterator.next(); assertThat(iterator.peekNextKey(), equalTo(0L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java index 1fc6c07507312..63cc7f7f2db5a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java @@ -146,7 +146,7 @@ public void shouldPeekNextKeyFromReverseCache() { @Test public void shouldIterateBothStoreAndCache() { - final MergedSortedCacheWindowStoreKeyValueIterator iterator = createIterator(storeKvs, cacheKvs, false); + final MergedSortedCacheWindowStoreKeyValueIterator iterator = createIterator(storeKvs, cacheKvs, true); assertThat(convertKeyValuePair(iterator.next()), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey))); assertThat(convertKeyValuePair(iterator.next()), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey))); assertFalse(iterator.hasNext()); @@ -154,7 +154,7 @@ public void shouldIterateBothStoreAndCache() { @Test public void shouldReverseIterateBothStoreAndCache() { - final MergedSortedCacheWindowStoreKeyValueIterator iterator = createIterator(storeKvs, cacheKvs, true); + final MergedSortedCacheWindowStoreKeyValueIterator iterator = createIterator(storeKvs, cacheKvs, false); assertThat(convertKeyValuePair(iterator.next()), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey))); assertThat(convertKeyValuePair(iterator.next()), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey))); assertFalse(iterator.hasNext()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java index a275df74e77e9..558f1c9471b1e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java @@ -188,7 +188,7 @@ public void shouldGetSegmentsWithinTimeRange() { segments.getOrCreateSegmentIfLive(3, context, streamTime); segments.getOrCreateSegmentIfLive(4, context, streamTime); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, false); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, true); assertEquals(3, segments.size()); assertEquals(0, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -208,7 +208,7 @@ public void shouldGetSegmentsWithinBackwardTimeRange() { segments.getOrCreateSegmentIfLive(3, context, streamTime); segments.getOrCreateSegmentIfLive(4, context, streamTime); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, true); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, false); assertEquals(3, segments.size()); assertEquals(0, segments.get(2).id); assertEquals(1, segments.get(1).id); @@ -223,7 +223,7 @@ public void shouldGetSegmentsWithinTimeRangeOutOfOrder() { updateStreamTimeAndCreateSegment(1); updateStreamTimeAndCreateSegment(3); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, false); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, true); assertEquals(3, segments.size()); assertEquals(0, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -238,7 +238,7 @@ public void shouldGetSegmentsWithinBackwardTimeRangeOutOfOrder() { updateStreamTimeAndCreateSegment(1); updateStreamTimeAndCreateSegment(3); - final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, true); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL, false); assertEquals(3, segments.size()); assertEquals(0, segments.get(2).id); assertEquals(1, segments.get(1).id); @@ -345,7 +345,7 @@ public void shouldClearSegmentsOnClose() { } private void verifyCorrectSegments(final long first, final int numSegments) { - final List result = this.segments.segments(0, Long.MAX_VALUE, false); + final List result = this.segments.segments(0, Long.MAX_VALUE, true); assertEquals(numSegments, result.size()); for (int i = 0; i < numSegments; i++) { assertEquals(i + first, result.get(i).id);