Skip to content

Commit

Permalink
KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed o…
Browse files Browse the repository at this point in the history
…rder (#11292)

When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.

Currently, in Window store, we store records in [segments -> [records] ].

For example:
window size = 500,
input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window

So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".

Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.

Reviewers: Jorge Esteban Quilcate Otoya <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
showuon authored Sep 13, 2021
1 parent a03bda6 commit 9628c12
Show file tree
Hide file tree
Showing 3 changed files with 451 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,14 @@ Iterator<Map.Entry<Bytes, byte[]>> setRecordIterator() {
final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next();
currentTime = currentSegment.getKey();

if (allKeys) {
return currentSegment.getValue().entrySet().iterator();
final ConcurrentNavigableMap<Bytes, byte[]> subMap = allKeys ?
currentSegment.getValue() :
currentSegment.getValue().subMap(keyFrom, true, keyTo, true);

if (forward) {
return subMap.entrySet().iterator();
} else {
return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator();
return subMap.descendingMap().entrySet().iterator();
}
}

Expand Down
Loading

0 comments on commit 9628c12

Please sign in to comment.