Skip to content

Commit

Permalink
KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue (
Browse files Browse the repository at this point in the history
…#11337)

In #9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman<apache.org>
  • Loading branch information
showuon authored Sep 29, 2021
1 parent 71f1cb7 commit 361b784
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, fina
removeExpiredSegments();


return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false);
return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), true);
}

@Override
Expand All @@ -292,7 +292,7 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFr
removeExpiredSegments();

return registerNewIterator(
keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true);
keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.toSet;
import static org.apache.kafka.common.utils.Utils.toList;
import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
Expand Down Expand Up @@ -121,15 +121,15 @@ public void shouldPutAndFindSessionsInRange() {

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key, 0, 1000L)
) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}

final List<KeyValue<Windowed<String>, Long>> expected2 =
Collections.singletonList(KeyValue.pair(a2, 2L));

try (final KeyValueIterator<Windowed<String>, Long> values2 = sessionStore.findSessions(key, 400L, 600L)
) {
assertEquals(new HashSet<>(expected2), toSet(values2));
assertEquals(expected2, toList(values2));
}
}

Expand All @@ -143,28 +143,29 @@ public void shouldPutAndBackwardFindSessionsInRange() {
sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L);
sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L);

final List<KeyValue<Windowed<String>, Long>> expected =
asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(a1, 1L));
expected.add(KeyValue.pair(a2, 2L));

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFindSessions(key, 0, 1000L)) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(toList(expected.descendingIterator()), toList(values));
}

final List<KeyValue<Windowed<String>, Long>> expected2 =
Collections.singletonList(KeyValue.pair(a2, 2L));

try (final KeyValueIterator<Windowed<String>, Long> values2 = sessionStore.backwardFindSessions(key, 400L, 600L)) {
assertEquals(new HashSet<>(expected2), toSet(values2));
assertEquals(expected2, toList(values2));
}
}

@Test
public void shouldFetchAllSessionsWithSameRecordKey() {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));

for (final KeyValue<Windowed<String>, Long> kv : expected) {
sessionStore.put(kv.key, kv.value);
Expand All @@ -174,18 +175,17 @@ public void shouldFetchAllSessionsWithSameRecordKey() {
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}
}

@Test
public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L),
KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)
);
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));

for (final KeyValue<Windowed<String>, Long> kv : expected) {
sessionStore.put(kv.key, kv.value);
Expand All @@ -195,18 +195,18 @@ public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFetch("a")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(toList(expected.descendingIterator()), toList(values));
}
}

@Test
public void shouldFetchAllSessionsWithinKeyRange() {
final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L),
KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L),

KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L),
KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L));
final List<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L));
expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L));
expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L));

for (final KeyValue<Windowed<String>, Long> kv : expected) {
sessionStore.put(kv.key, kv.value);
Expand All @@ -217,19 +217,22 @@ public void shouldFetchAllSessionsWithinKeyRange() {
sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("aa", "bb")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions("aa", "bb", 0L, Long.MAX_VALUE)) {
assertEquals(expected, toList(values));
}
}

@Test
public void shouldBackwardFetchAllSessionsWithinKeyRange() {
final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L),
KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L),

KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L),
KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)
);
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L));
expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L));
expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L));
expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L));
expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L));

for (final KeyValue<Windowed<String>, Long> kv : expected) {
sessionStore.put(kv.key, kv.value);
Expand All @@ -240,7 +243,11 @@ public void shouldBackwardFetchAllSessionsWithinKeyRange() {
sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFetch("aa", "bb")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(toList(expected.descendingIterator()), toList(values));
}

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFindSessions("aa", "bb", 0L, Long.MAX_VALUE)) {
assertEquals(toList(expected.descendingIterator()), toList(values));
}
}

Expand Down Expand Up @@ -272,7 +279,7 @@ public void shouldFindValuesWithinMergingSessionWindowRange() {
KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L));

try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions(key, -1, 1000L)) {
assertEquals(new HashSet<>(expected), toSet(results));
assertEquals(expected, toList(results));
}
}

Expand All @@ -282,13 +289,12 @@ public void shouldBackwardFindValuesWithinMergingSessionWindowRange() {
sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L);
sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L);

final List<KeyValue<Windowed<String>, Long>> expected = asList(
KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L),
KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)
);
final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
expected.add(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L));
expected.add(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L));

try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.backwardFindSessions(key, -1, 1000L)) {
assertEquals(new HashSet<>(expected), toSet(results));
assertEquals(toList(expected.descendingIterator()), toList(results));
}
}

Expand Down Expand Up @@ -341,7 +347,7 @@ public void shouldFindSessionsToMerge() {
Arrays.asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L));

try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions("a", 150, 300)) {
assertEquals(new HashSet<>(expected), toSet(results));
assertEquals(expected, toList(results));
}
}

Expand All @@ -359,10 +365,10 @@ public void shouldBackwardFindSessionsToMerge() {
sessionStore.put(session5, 5L);

final List<KeyValue<Windowed<String>, Long>> expected =
asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L));
asList(KeyValue.pair(session3, 3L), KeyValue.pair(session2, 2L));

try (final KeyValueIterator<Windowed<String>, Long> results = sessionStore.backwardFindSessions("a", 150, 300)) {
assertEquals(new HashSet<>(expected), toSet(results));
assertEquals(expected, toList(results));
}
}

Expand Down Expand Up @@ -400,7 +406,7 @@ public void shouldFetchExactKeys() {
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.findSessions("a", "aa", 10, 0)
) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L))));
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L))));
}
}

Expand Down Expand Up @@ -438,7 +444,7 @@ public void shouldBackwardFetchExactKeys() {
try (final KeyValueIterator<Windowed<String>, Long> iterator =
sessionStore.backwardFindSessions("a", "aa", 10, 0)
) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L))));
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L))));
}
}

Expand All @@ -463,12 +469,20 @@ public void shouldFetchAndIterateOverExactBinaryKeys() {
sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8");
sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9");

final Set<String> expectedKey1 = new HashSet<>(asList("1", "4", "7"));
assertThat(valuesToSet(sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1));
final Set<String> expectedKey2 = new HashSet<>(asList("2", "5", "8"));
assertThat(valuesToSet(sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2));
final Set<String> expectedKey3 = new HashSet<>(asList("3", "6", "9"));
assertThat(valuesToSet(sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3));
final List<String> expectedKey1 = asList("1", "4", "7");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1)));
}

final List<String> expectedKey2 = asList("2", "5", "8");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey2)));
}

final List<String> expectedKey3 = asList("3", "6", "9");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey3)));
}

sessionStore.close();
}
Expand All @@ -494,12 +508,21 @@ public void shouldBackwardFetchAndIterateOverExactBinaryKeys() {
sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8");
sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9");

final Set<String> expectedKey1 = new HashSet<>(asList("1", "4", "7"));
assertThat(valuesToSet(sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1));
final Set<String> expectedKey2 = new HashSet<>(asList("2", "5", "8"));
assertThat(valuesToSet(sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2));
final Set<String> expectedKey3 = new HashSet<>(asList("3", "6", "9"));
assertThat(valuesToSet(sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3));

final List<String> expectedKey1 = asList("7", "4", "1");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1)));
}

final List<String> expectedKey2 = asList("8", "5", "2");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey2)));
}

final List<String> expectedKey3 = asList("9", "6", "3");
try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)) {
assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey3)));
}

sessionStore.close();
}
Expand Down Expand Up @@ -550,13 +573,13 @@ public void shouldRestore() {
}

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}

sessionStore.close();

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(Collections.emptySet(), toSet(values));
assertEquals(Collections.emptyList(), toList(values));
}


Expand All @@ -568,7 +591,7 @@ public void shouldRestore() {
context.restore(sessionStore.name(), changeLog);

try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.fetch("a")) {
assertEquals(new HashSet<>(expected), toSet(values));
assertEquals(expected, toList(values));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void shouldRemove() {
cachingStore.remove(a);

try (final KeyValueIterator<Windowed<Bytes>, byte[]> rangeIter =
cachingStore.findSessions(keyA, 0, 0)) {
cachingStore.findSessions(keyA, 0, 0)) {
assertFalse(rangeIter.hasNext());

assertNull(cachingStore.fetchSession(keyA, 0, 0));
Expand Down
Loading

0 comments on commit 361b784

Please sign in to comment.