From cc89c9fe5f8fea22e72331130a92218296f39323 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 29 Sep 2021 08:31:29 +0800 Subject: [PATCH] KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue (#11337) In #9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order. Reviewers: Anna Sophie Blee-Goldman --- .../state/internals/InMemorySessionStore.java | 4 +- .../AbstractSessionBytesStoreTest.java | 147 ++++++----- .../CachingInMemorySessionStoreTest.java | 11 +- .../internals/SessionStoreFetchTest.java | 232 ++++++++++++++++++ 4 files changed, 323 insertions(+), 71 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 722ed43ff0cae..a14d3e11cefec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -281,7 +281,7 @@ public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, fina removeExpiredSegments(); - return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); + return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), true); } @Override @@ -292,7 +292,7 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFr removeExpiredSegments(); return registerNewIterator( - keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true); + keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), false); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 6a4fb7113f017..e240ccee93d21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -47,15 +47,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.test.StreamsTestUtils.toSet; +import static org.apache.kafka.common.utils.Utils.toList; import static org.apache.kafka.test.StreamsTestUtils.valuesToSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; @@ -121,7 +121,7 @@ public void shouldPutAndFindSessionsInRange() { try (final KeyValueIterator, Long> values = sessionStore.findSessions(key, 0, 1000L) ) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } final List, Long>> expected2 = @@ -129,7 +129,7 @@ public void shouldPutAndFindSessionsInRange() { try (final KeyValueIterator, Long> values2 = sessionStore.findSessions(key, 400L, 600L) ) { - assertEquals(new HashSet<>(expected2), toSet(values2)); + assertEquals(expected2, toList(values2)); } } @@ -143,28 +143,29 @@ public void shouldPutAndBackwardFindSessionsInRange() { sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L); sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L); - final List, Long>> expected = - asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(a1, 1L)); + expected.add(KeyValue.pair(a2, 2L)); try (final KeyValueIterator, Long> values = sessionStore.backwardFindSessions(key, 0, 1000L)) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(toList(expected.descendingIterator()), toList(values)); } final List, Long>> expected2 = Collections.singletonList(KeyValue.pair(a2, 2L)); try (final KeyValueIterator, Long> values2 = sessionStore.backwardFindSessions(key, 400L, 600L)) { - assertEquals(new HashSet<>(expected2), toSet(values2)); + assertEquals(expected2, toList(values2)); } } @Test public void shouldFetchAllSessionsWithSameRecordKey() { - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); @@ -174,18 +175,17 @@ public void shouldFetchAllSessionsWithSameRecordKey() { sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } } @Test public void shouldBackwardFetchAllSessionsWithSameRecordKey() { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L) - ); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); @@ -195,18 +195,18 @@ public void shouldBackwardFetchAllSessionsWithSameRecordKey() { sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("a")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(toList(expected.descendingIterator()), toList(values)); } } @Test public void shouldFetchAllSessionsWithinKeyRange() { - final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L), - - KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); + final List, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L)); + expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L)); + expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L)); + expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); @@ -217,19 +217,22 @@ public void shouldFetchAllSessionsWithinKeyRange() { sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); try (final KeyValueIterator, Long> values = sessionStore.fetch("aa", "bb")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator, Long> values = sessionStore.findSessions("aa", "bb", 0L, Long.MAX_VALUE)) { + assertEquals(expected, toList(values)); } } @Test public void shouldBackwardFetchAllSessionsWithinKeyRange() { - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L), - - KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L) - ); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L)); + expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L)); + expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L)); + expected.add(KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L)); + expected.add(KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L)); for (final KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); @@ -240,7 +243,11 @@ public void shouldBackwardFetchAllSessionsWithinKeyRange() { sessionStore.put(new Windowed<>("bbb", new SessionWindow(2500, 3000)), 6L); try (final KeyValueIterator, Long> values = sessionStore.backwardFetch("aa", "bb")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(toList(expected.descendingIterator()), toList(values)); + } + + try (final KeyValueIterator, Long> values = sessionStore.backwardFindSessions("aa", "bb", 0L, Long.MAX_VALUE)) { + assertEquals(toList(expected.descendingIterator()), toList(values)); } } @@ -272,7 +279,7 @@ public void shouldFindValuesWithinMergingSessionWindowRange() { KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); try (final KeyValueIterator, Long> results = sessionStore.findSessions(key, -1, 1000L)) { - assertEquals(new HashSet<>(expected), toSet(results)); + assertEquals(expected, toList(results)); } } @@ -282,13 +289,12 @@ public void shouldBackwardFindValuesWithinMergingSessionWindowRange() { sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); - final List, Long>> expected = asList( - KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), - KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L) - ); + final LinkedList, Long>> expected = new LinkedList<>(); + expected.add(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L)); + expected.add(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); try (final KeyValueIterator, Long> results = sessionStore.backwardFindSessions(key, -1, 1000L)) { - assertEquals(new HashSet<>(expected), toSet(results)); + assertEquals(toList(expected.descendingIterator()), toList(results)); } } @@ -341,7 +347,7 @@ public void shouldFindSessionsToMerge() { Arrays.asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L)); try (final KeyValueIterator, Long> results = sessionStore.findSessions("a", 150, 300)) { - assertEquals(new HashSet<>(expected), toSet(results)); + assertEquals(expected, toList(results)); } } @@ -359,10 +365,10 @@ public void shouldBackwardFindSessionsToMerge() { sessionStore.put(session5, 5L); final List, Long>> expected = - asList(KeyValue.pair(session2, 2L), KeyValue.pair(session3, 3L)); + asList(KeyValue.pair(session3, 3L), KeyValue.pair(session2, 2L)); try (final KeyValueIterator, Long> results = sessionStore.backwardFindSessions("a", 150, 300)) { - assertEquals(new HashSet<>(expected), toSet(results)); + assertEquals(expected, toList(results)); } } @@ -400,7 +406,7 @@ public void shouldFetchExactKeys() { try (final KeyValueIterator, Long> iterator = sessionStore.findSessions("a", "aa", 10, 0) ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L)))); } } @@ -438,7 +444,7 @@ public void shouldBackwardFetchExactKeys() { try (final KeyValueIterator, Long> iterator = sessionStore.backwardFindSessions("a", "aa", 10, 0) ) { - assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L)))); } } @@ -463,12 +469,20 @@ public void shouldFetchAndIterateOverExactBinaryKeys() { sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8"); sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9"); - final Set expectedKey1 = new HashSet<>(asList("1", "4", "7")); - assertThat(valuesToSet(sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1)); - final Set expectedKey2 = new HashSet<>(asList("2", "5", "8")); - assertThat(valuesToSet(sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2)); - final Set expectedKey3 = new HashSet<>(asList("3", "6", "9")); - assertThat(valuesToSet(sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3)); + final List expectedKey1 = asList("1", "4", "7"); + try (KeyValueIterator, String> iterator = sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1))); + } + + final List expectedKey2 = asList("2", "5", "8"); + try (KeyValueIterator, String> iterator = sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey2))); + } + + final List expectedKey3 = asList("3", "6", "9"); + try (KeyValueIterator, String> iterator = sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey3))); + } sessionStore.close(); } @@ -494,12 +508,21 @@ public void shouldBackwardFetchAndIterateOverExactBinaryKeys() { sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8"); sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9"); - final Set expectedKey1 = new HashSet<>(asList("1", "4", "7")); - assertThat(valuesToSet(sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1)); - final Set expectedKey2 = new HashSet<>(asList("2", "5", "8")); - assertThat(valuesToSet(sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2)); - final Set expectedKey3 = new HashSet<>(asList("3", "6", "9")); - assertThat(valuesToSet(sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3)); + + final List expectedKey1 = asList("7", "4", "1"); + try (KeyValueIterator, String> iterator = sessionStore.backwardFindSessions(key1, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1))); + } + + final List expectedKey2 = asList("8", "5", "2"); + try (KeyValueIterator, String> iterator = sessionStore.backwardFindSessions(key2, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey2))); + } + + final List expectedKey3 = asList("9", "6", "3"); + try (KeyValueIterator, String> iterator = sessionStore.backwardFindSessions(key3, 0L, Long.MAX_VALUE)) { + assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey3))); + } sessionStore.close(); } @@ -548,13 +571,13 @@ public void shouldRestore() { } try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } sessionStore.close(); try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { - assertEquals(Collections.emptySet(), toSet(values)); + assertEquals(Collections.emptyList(), toList(values)); } @@ -566,7 +589,7 @@ public void shouldRestore() { context.restore(sessionStore.name(), changeLog); try (final KeyValueIterator, Long> values = sessionStore.fetch("a")) { - assertEquals(new HashSet<>(expected), toSet(values)); + assertEquals(expected, toList(values)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index 67258e74177ab..1b44b91906ac6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -326,13 +326,10 @@ public void shouldRemove() { cachingStore.put(b, "2".getBytes()); cachingStore.remove(a); - final KeyValueIterator, byte[]> rangeIter = - cachingStore.findSessions(keyA, 0, 0); - assertFalse(rangeIter.hasNext()); - - assertNull(cachingStore.fetchSession(keyA, 0, 0)); - assertThat(cachingStore.fetchSession(keyB, 0, 0), equalTo("2".getBytes())); - + try (final KeyValueIterator, byte[]> rangeIter = + cachingStore.findSessions(keyA, 0, 0)) { + assertFalse(rangeIter.hasNext()); + } } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java new file mode 100644 index 0000000000000..1e274a68a3af6 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreFetchTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.function.Supplier; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; + +@RunWith(Parameterized.class) +public class SessionStoreFetchTest { + private enum StoreType { InMemory, RocksDB }; + private static final String STORE_NAME = "store"; + private static final int DATA_SIZE = 5; + private static final long WINDOW_SIZE = 500L; + private static final long RETENTION_MS = 10000L; + + private StoreType storeType; + private boolean enableLogging; + private boolean enableCaching; + private boolean forward; + + private LinkedList, Long>> expectedRecords; + private LinkedList> records; + private Properties streamsConfig; + + public SessionStoreFetchTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { + this.storeType = storeType; + this.enableLogging = enableLogging; + this.enableCaching = enableCaching; + this.forward = forward; + + this.records = new LinkedList<>(); + this.expectedRecords = new LinkedList<>(); + final int m = DATA_SIZE / 2; + for (int i = 0; i < DATA_SIZE; i++) { + final String keyStr = i < m ? "a" : "b"; + final String key = "key-" + keyStr; + final String key2 = "key-" + keyStr + keyStr; + final String value = "val-" + i; + final KeyValue r = new KeyValue<>(key, value); + final KeyValue r2 = new KeyValue<>(key2, value); + records.add(r); + records.add(r2); + } + expectedRecords.add(new KeyValue<>(new Windowed<>("key-a", new SessionWindow(0, 500)), 4L)); + expectedRecords.add(new KeyValue<>(new Windowed<>("key-aa", new SessionWindow(0, 500)), 4L)); + expectedRecords.add(new KeyValue<>(new Windowed<>("key-b", new SessionWindow(1500, 2000)), 6L)); + expectedRecords.add(new KeyValue<>(new Windowed<>("key-bb", new SessionWindow(1500, 2000)), 6L)); + } + + @Rule + public TestName testName = new TestName(); + + @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") + public static Collection data() { + final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB); + final List logging = Arrays.asList(true, false); + final List caching = Arrays.asList(true, false); + final List forward = Arrays.asList(true, false); + return buildParameters(types, logging, caching, forward); + } + + @Before + public void setup() { + streamsConfig = mkProperties(mkMap( + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) + )); + } + + @Test + public void testStoreConfig() { + final Materialized> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching); + //Create topology: table from input topic + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + stream. + groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(WINDOW_SIZE))) + .count(stateStoreConfig) + .toStream() + .to("output"); + + final Topology topology = builder.build(); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) { + //get input topic and stateStore + final TestInputTopic input = driver + .createInputTopic("input", new StringSerializer(), new StringSerializer()); + final SessionStore stateStore = driver.getSessionStore(STORE_NAME); + + //write some data + final int medium = DATA_SIZE / 2 * 2; + for (int i = 0; i < records.size(); i++) { + final KeyValue kv = records.get(i); + final long windowStartTime = i < medium ? 0 : 1500; + input.pipeInput(kv.key, kv.value, windowStartTime); + input.pipeInput(kv.key, kv.value, windowStartTime + WINDOW_SIZE); + } + + // query the state store + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.fetch("key-a", "key-bb") : + stateStore.backwardFetch("key-a", "key-bb")) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.findSessions("key-a", "key-bb", 0L, Long.MAX_VALUE) : + stateStore.backwardFindSessions("key-a", "key-bb", 0L, Long.MAX_VALUE)) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + } + } + + private static Collection buildParameters(final List... argOptions) { + List result = new LinkedList<>(); + result.add(new Object[0]); + + for (final List argOption : argOptions) { + result = times(result, argOption); + } + + return result; + } + + private static List times(final List left, final List right) { + final List result = new LinkedList<>(); + for (final Object[] args : left) { + for (final Object rightElem : right) { + final Object[] resArgs = new Object[args.length + 1]; + System.arraycopy(args, 0, resArgs, 0, args.length); + resArgs[args.length] = rightElem; + result.add(resArgs); + } + } + return result; + } + + private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { + final Supplier createStore = () -> { + if (type == StoreType.InMemory) { + return Stores.inMemorySessionStore(STORE_NAME, Duration.ofMillis(RETENTION_MS)); + } else if (type == StoreType.RocksDB) { + return Stores.persistentSessionStore(STORE_NAME, Duration.ofMillis(RETENTION_MS)); + } else { + return Stores.inMemorySessionStore(STORE_NAME, Duration.ofMillis(RETENTION_MS)); + } + }; + + final SessionBytesStoreSupplier stateStoreSupplier = createStore.get(); + final Materialized> stateStoreConfig = Materialized + .as(stateStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long()); + if (cachingEnabled) { + stateStoreConfig.withCachingEnabled(); + } else { + stateStoreConfig.withCachingDisabled(); + } + if (loggingEnabled) { + stateStoreConfig.withLoggingEnabled(new HashMap()); + } else { + stateStoreConfig.withLoggingDisabled(); + } + return stateStoreConfig; + } +}