diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index c1b81c66f37fa..03eaa07057078 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -29,8 +29,6 @@ import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Objects; @@ -74,8 +72,6 @@ @InterfaceStability.Evolving public class Stores { - private static final Logger log = LoggerFactory.getLogger(Stores.class); - /** * Create a persistent {@link KeyValueBytesStoreSupplier}. * @param name name of the store (cannot be {@code null}) @@ -195,7 +191,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * Create a persistent {@link WindowBytesStoreSupplier}. * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * @param segmentInterval size of segments in ms (must be at least one minute) + * @param segmentInterval size of segments in ms (cannot be negative) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. * @return an instance of {@link WindowBytesStoreSupplier} @@ -206,14 +202,14 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, final boolean retainDuplicates, final long segmentInterval) { Objects.requireNonNull(name, "name cannot be null"); - if (retentionPeriod < 0) { + if (retentionPeriod < 0L) { throw new IllegalArgumentException("retentionPeriod cannot be negative"); } - if (windowSize < 0) { + if (windowSize < 0L) { throw new IllegalArgumentException("windowSize cannot be negative"); } - if (segmentInterval < 60_000) { - throw new IllegalArgumentException("segmentInterval must be at least one minute"); + if (segmentInterval < 1L) { + throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); } return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java index 23f246d001ec9..d0da15880cc19 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java @@ -38,6 +38,7 @@ public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() { @Test(expected = NullPointerException.class) public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() { + //noinspection ResultOfMethodCallIgnored Stores.inMemoryKeyValueStore(null); } @@ -53,12 +54,12 @@ public void shouldThrowIfILruMapStoreCapacityIsNegative() { @Test(expected = NullPointerException.class) public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() { - Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L); + Stores.persistentWindowStore(null, 0L, 0L, false, 0L); } @Test(expected = IllegalArgumentException.class) public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() { - Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L); + Stores.persistentWindowStore("anyName", -1L, 0L, false, 0L); } @Deprecated @@ -74,7 +75,7 @@ public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() { @Test(expected = IllegalArgumentException.class) public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() { - Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L); + Stores.persistentWindowStore("anyName", 1L, 1L, false, -1L); } @Test(expected = NullPointerException.class) @@ -129,25 +130,31 @@ public void shouldCreateRocksDbSessionStore() { @Test public void shouldBuildWindowStore() { - final WindowStore store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true), - Serdes.String(), - Serdes.String()).build(); + final WindowStore store = Stores.windowStoreBuilder( + Stores.persistentWindowStore("store", 3L, 3L, true), + Serdes.String(), + Serdes.String() + ).build(); assertThat(store, not(nullValue())); } @Test public void shouldBuildKeyValueStore() { - final KeyValueStore store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"), - Serdes.String(), - Serdes.String()).build(); + final KeyValueStore store = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("name"), + Serdes.String(), + Serdes.String() + ).build(); assertThat(store, not(nullValue())); } @Test public void shouldBuildSessionStore() { - final SessionStore store = Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10), - Serdes.String(), - Serdes.String()).build(); + final SessionStore store = Stores.sessionStoreBuilder( + Stores.persistentSessionStore("name", 10), + Serdes.String(), + Serdes.String() + ).build(); assertThat(store, not(nullValue())); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 194edb15f0a79..47e79c96f8e87 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -38,9 +37,12 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; @@ -49,14 +51,15 @@ import static org.junit.Assert.assertFalse; +@SuppressWarnings("PointlessArithmeticExpression") public class CachingSessionStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 600; - private InternalMockProcessorContext context; + private static final Long DEFAULT_TIMESTAMP = 10L; + private static final long SEGMENT_INTERVAL = 100L; private RocksDBSegmentedBytesStore underlying; private CachingSessionStore cachingStore; private ThreadCache cache; - private static final Long DEFAULT_TIMESTAMP = 10L; private final Bytes keyA = Bytes.wrap("a".getBytes()); private final Bytes keyAA = Bytes.wrap("aa".getBytes()); private final Bytes keyB = Bytes.wrap("b".getBytes()); @@ -65,17 +68,11 @@ public class CachingSessionStoreTest { public void setUp() { final SessionKeySchema schema = new SessionKeySchema(); schema.init("topic"); - final int retention = 60000; - final int segmentInterval = 60_000; - underlying = new RocksDBSegmentedBytesStore("test", retention, segmentInterval, schema); + underlying = new RocksDBSegmentedBytesStore("test", 0L, SEGMENT_INTERVAL, schema); final RocksDBSessionStore sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray()); - cachingStore = new CachingSessionStore<>(sessionStore, - Serdes.String(), - Serdes.String(), - segmentInterval - ); + cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); + final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null)); cachingStore.init(context, cachingStore); } @@ -134,11 +131,13 @@ public void shouldPutFetchRangeFromCache() { @Test public void shouldFetchAllSessionsWithSameRecordKey() { - final List, byte[]>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), - KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()), - KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()), - KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes())); - for (KeyValue, byte[]> kv : expected) { + final List, byte[]>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()), + KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()) + ); + for (final KeyValue, byte[]> kv : expected) { cachingStore.put(kv.key, kv.value); } @@ -184,14 +183,14 @@ public void shouldRemove() { @Test public void shouldFetchCorrectlyAcrossSegments() { - final Windowed a1 = new Windowed<>(keyA, new SessionWindow(0, 0)); - final Windowed a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000)); - final Windowed a3 = new Windowed<>(keyA, new SessionWindow(120_000, 120_000)); + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); cachingStore.flush(); - final KeyValueIterator, byte[]> results = cachingStore.findSessions(keyA, 0, 60_000 * 2); + final KeyValueIterator, byte[]> results = cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2); assertEquals(a1, results.next().key); assertEquals(a2, results.next().key); assertEquals(a3, results.next().key); @@ -200,11 +199,11 @@ public void shouldFetchCorrectlyAcrossSegments() { @Test public void shouldFetchRangeCorrectlyAcrossSegments() { - final Windowed a1 = new Windowed<>(keyA, new SessionWindow(0, 0)); - final Windowed aa1 = new Windowed<>(keyAA, new SessionWindow(0, 0)); - final Windowed a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000)); - final Windowed a3 = new Windowed<>(keyA, new SessionWindow(60_000 * 2, 60_000 * 2)); - final Windowed aa3 = new Windowed<>(keyAA, new SessionWindow(60_000 * 2, 60_000 * 2)); + final Windowed a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0)); + final Windowed a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1)); + final Windowed a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); + final Windowed aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2)); cachingStore.put(a1, "1".getBytes()); cachingStore.put(aa1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); @@ -212,13 +211,13 @@ public void shouldFetchRangeCorrectlyAcrossSegments() { cachingStore.put(aa3, "3".getBytes()); cachingStore.flush(); - final KeyValueIterator, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, 60_000 * 2); - assertEquals(a1, rangeResults.next().key); - assertEquals(aa1, rangeResults.next().key); - assertEquals(a2, rangeResults.next().key); - assertEquals(a3, rangeResults.next().key); - assertEquals(aa3, rangeResults.next().key); - assertFalse(rangeResults.hasNext()); + final KeyValueIterator, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2); + final Set> keys = new HashSet<>(); + while (rangeResults.hasNext()) { + keys.add(rangeResults.next().key); + } + rangeResults.close(); + assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys); } @Test @@ -226,25 +225,28 @@ public void shouldForwardChangedValuesDuringFlush() { final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); final List, Change>> flushed = new ArrayList<>(); - cachingStore.setFlushListener(new CacheFlushListener, String>() { - @Override - public void apply(final Windowed key, final String newValue, final String oldValue) { - flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))); - } - }, true); - + cachingStore.setFlushListener( + (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))), + true + ); + cachingStore.put(a, "1".getBytes()); cachingStore.flush(); - + cachingStore.put(a, "2".getBytes()); cachingStore.flush(); cachingStore.remove(a); cachingStore.flush(); - assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)), - KeyValue.pair(aDeserialized, new Change<>("2", "1")), - KeyValue.pair(aDeserialized, new Change<>(null, "2")))); + assertEquals( + flushed, + Arrays.asList( + KeyValue.pair(aDeserialized, new Change<>("1", null)), + KeyValue.pair(aDeserialized, new Change<>("2", "1")), + KeyValue.pair(aDeserialized, new Change<>(null, "2")) + ) + ); } @Test @@ -252,12 +254,10 @@ public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRec final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); final List, Change>> flushed = new ArrayList<>(); - cachingStore.setFlushListener(new CacheFlushListener, String>() { - @Override - public void apply(final Windowed key, final String newValue, final String oldValue) { - flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))); - } - }, false); + cachingStore.setFlushListener( + (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))), + false + ); cachingStore.put(a, "1".getBytes()); cachingStore.flush(); @@ -268,9 +268,14 @@ public void apply(final Windowed key, final String newValue, final Strin cachingStore.remove(a); cachingStore.flush(); - assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)), - KeyValue.pair(aDeserialized, new Change<>("2", null)), - KeyValue.pair(aDeserialized, new Change<>(null, "2")))); + assertEquals( + flushed, + Arrays.asList( + KeyValue.pair(aDeserialized, new Change<>("1", null)), + KeyValue.pair(aDeserialized, new Change<>("2", null)), + KeyValue.pair(aDeserialized, new Change<>(null, "2")) + ) + ); } @Test @@ -278,12 +283,10 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() final Windowed a = new Windowed<>(keyA, new SessionWindow(0, 0)); final Windowed aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); final List, Change>> flushed = new ArrayList<>(); - cachingStore.setFlushListener(new CacheFlushListener, String>() { - @Override - public void apply(final Windowed key, final String newValue, final String oldValue) { - flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))); - } - }, false); + cachingStore.setFlushListener( + (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))), + false + ); cachingStore.put(a, "1".getBytes()); cachingStore.flush(); @@ -292,8 +295,13 @@ public void apply(final Windowed key, final String newValue, final Strin cachingStore.flush(); - assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)), - KeyValue.pair(aDeserialized, new Change<>("2", null)))); + assertEquals( + flushed, + Arrays.asList( + KeyValue.pair(aDeserialized, new Change<>("1", null)), + KeyValue.pair(aDeserialized, new Change<>("2", null)) + ) + ); } @Test @@ -369,7 +377,7 @@ public void shouldThrowNullPointerExceptionOnPutNullKey() { cachingStore.put(null, "1".getBytes()); } - private List, byte[]>> addSessionsUntilOverflow(final String...sessionIds) { + private List, byte[]>> addSessionsUntilOverflow(final String... sessionIds) { final Random random = new Random(); final List, byte[]>> results = new ArrayList<>(); while (cache.size() == results.size()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 118acecc6c15b..3bcb1a29b2457 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.InternalMockProcessorContext; @@ -35,7 +34,6 @@ import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.List; import static org.apache.kafka.common.utils.Utils.mkList; @@ -55,7 +53,8 @@ public class CachingWindowStoreTest { private static final int MAX_CACHE_SIZE_BYTES = 150; private static final long DEFAULT_TIMESTAMP = 10L; - private static final Long WINDOW_SIZE = 10000L; + private static final Long WINDOW_SIZE = 10L; + private static final long SEGMENT_INTERVAL = 100L; private InternalMockProcessorContext context; private RocksDBSegmentedBytesStore underlying; private CachingWindowStore cachingStore; @@ -67,20 +66,14 @@ public class CachingWindowStoreTest { @Before public void setUp() { keySchema = new WindowKeySchema(); - final int retention = 60_000; - final int segmentInterval = 60_000; - underlying = new RocksDBSegmentedBytesStore("test", retention, segmentInterval, keySchema); + underlying = new RocksDBSegmentedBytesStore("test", 0, SEGMENT_INTERVAL, keySchema); final RocksDBWindowStore windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE); cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>(); - cachingStore = new CachingWindowStore<>(windowStore, - Serdes.String(), - Serdes.String(), - WINDOW_SIZE, - segmentInterval); + cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), Serdes.String(), WINDOW_SIZE, SEGMENT_INTERVAL); cachingStore.setFlushListener(cacheListener, false); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); topic = "topic"; - context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); + context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic, null)); cachingStore.init(context, cachingStore); } @@ -133,7 +126,7 @@ public void shouldPutFetchRangeFromCache() { assertFalse(iterator.hasNext()); assertEquals(2, cache.size()); } - + @Test public void shouldGetAllFromCache() { cachingStore.put(bytesKey("a"), bytesValue("a")); @@ -146,46 +139,46 @@ public void shouldGetAllFromCache() { cachingStore.put(bytesKey("h"), bytesValue("h")); final KeyValueIterator, byte[]> iterator = cachingStore.all(); - String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"}; - for (String s : array) { + final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"}; + for (final String s : array) { verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), s); } assertFalse(iterator.hasNext()); } - + @Test public void shouldFetchAllWithinTimestampRange() { - String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"}; + final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"}; for (int i = 0; i < array.length; i++) { context.setTime(i); cachingStore.put(bytesKey(array[i]), bytesValue(array[i])); } - + final KeyValueIterator, byte[]> iterator = cachingStore.fetchAll(0, 7); for (int i = 0; i < array.length; i++) { - String str = array[i]; + final String str = array[i]; verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str); } assertFalse(iterator.hasNext()); - + final KeyValueIterator, byte[]> iterator1 = cachingStore.fetchAll(2, 4); for (int i = 2; i <= 4; i++) { - String str = array[i]; + final String str = array[i]; verifyWindowedKeyValue(iterator1.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str); } assertFalse(iterator1.hasNext()); - + final KeyValueIterator, byte[]> iterator2 = cachingStore.fetchAll(5, 7); for (int i = 5; i <= 7; i++) { - String str = array[i]; + final String str = array[i]; verifyWindowedKeyValue(iterator2.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str); } assertFalse(iterator2.hasNext()); } @Test - public void shouldFlushEvictedItemsIntoUnderlyingStore() throws IOException { - int added = addItemsToCache(); + public void shouldFlushEvictedItemsIntoUnderlyingStore() { + final int added = addItemsToCache(); // all dirty entries should have been flushed final KeyValueIterator iter = underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP); final KeyValue next = iter.next(); @@ -228,8 +221,8 @@ public void shouldForwardOldValuesWhenDisabled() { } @Test - public void shouldForwardDirtyItemToListenerWhenEvicted() throws IOException { - int numRecords = addItemsToCache(); + public void shouldForwardDirtyItemToListenerWhenEvicted() { + final int numRecords = addItemsToCache(); assertEquals(numRecords, cacheListener.forwarded.size()); } @@ -257,7 +250,7 @@ public void shouldIterateAcrossWindows() { @Test public void shouldIterateCacheAndStore() { - final Bytes key = Bytes.wrap("1" .getBytes()); + final Bytes key = Bytes.wrap("1".getBytes()); underlying.put(WindowKeySchema.toStoreKeyBinary(key, DEFAULT_TIMESTAMP, 0), "a".getBytes()); cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE); final WindowStoreIterator fetch = cachingStore.fetch(bytesKey("1"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE); @@ -268,7 +261,7 @@ public void shouldIterateCacheAndStore() { @Test public void shouldIterateCacheAndStoreKeyRange() { - final Bytes key = Bytes.wrap("1" .getBytes()); + final Bytes key = Bytes.wrap("1".getBytes()); underlying.put(WindowKeySchema.toStoreKeyBinary(key, DEFAULT_TIMESTAMP, 0), "a".getBytes()); cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE); @@ -311,9 +304,13 @@ public void shouldFetchAndIterateOverExactKeys() { cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0); cachingStore.put(bytesKey("a"), bytesValue("0003"), 1); cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1); - cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000); + cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL); - final List> expected = mkList(KeyValue.pair(0L, bytesValue("0001")), KeyValue.pair(1L, bytesValue("0003")), KeyValue.pair(60000L, bytesValue("0005"))); + final List> expected = mkList( + KeyValue.pair(0L, bytesValue("0001")), + KeyValue.pair(1L, bytesValue("0003")), + KeyValue.pair(SEGMENT_INTERVAL, bytesValue("0005")) + ); final List> actual = toList(cachingStore.fetch(bytesKey("a"), 0, Long.MAX_VALUE)); verifyKeyValueList(expected, actual); } @@ -324,16 +321,32 @@ public void shouldFetchAndIterateOverKeyRange() { cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0); cachingStore.put(bytesKey("a"), bytesValue("0003"), 1); cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1); - cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000); - - verifyKeyValueList(mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("a", "0005", 60000L)), - toList(cachingStore.fetch(bytesKey("a"), bytesKey("a"), 0, Long.MAX_VALUE))); - - verifyKeyValueList(mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)), - toList(cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0, Long.MAX_VALUE))); - - verifyKeyValueList(mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1), windowedPair("a", "0005", 60000L)), - toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 0, Long.MAX_VALUE))); + cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL); + + verifyKeyValueList( + mkList( + windowedPair("a", "0001", 0), + windowedPair("a", "0003", 1), + windowedPair("a", "0005", SEGMENT_INTERVAL) + ), + toList(cachingStore.fetch(bytesKey("a"), bytesKey("a"), 0, Long.MAX_VALUE)) + ); + + verifyKeyValueList( + mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)), + toList(cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0, Long.MAX_VALUE)) + ); + + verifyKeyValueList( + mkList( + windowedPair("a", "0001", 0), + windowedPair("a", "0003", 1), + windowedPair("aa", "0002", 0), + windowedPair("aa", "0004", 1), + windowedPair("a", "0005", SEGMENT_INTERVAL) + ), + toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 0, Long.MAX_VALUE)) + ); } @Test(expected = NullPointerException.class) @@ -361,7 +374,7 @@ public void shouldThrowNullPointerExceptionOnRangeNullToKey() { cachingStore.fetch(bytesKey("anyFrom"), null, 1L, 2L); } - private static KeyValue, byte[]> windowedPair(String key, String value, long timestamp) { + private static KeyValue, byte[]> windowedPair(final String key, final String value, final long timestamp) { return KeyValue.pair(new Windowed<>(bytesKey(key), new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), bytesValue(value)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 8e69ccbfee82e..cffd73f05a2b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -38,13 +38,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.rocksdb.WriteBatch; -import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; - - import java.io.File; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -52,12 +49,13 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SimpleTimeZone; +import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -68,14 +66,13 @@ @RunWith(Parameterized.class) public class RocksDBSegmentedBytesStoreTest { + private final long windowSizeForTimeWindow = 500; private final long retention = 1000; - private final long segmentInterval = 60_000; - private final int numSegments = 3; + private final long segmentInterval = 60_000L; private InternalMockProcessorContext context; private final String storeName = "bytes-store"; private RocksDBSegmentedBytesStore bytesStore; private File stateDir; - private long windowSizeForTimeWindow = 500; private final Window[] windows = new Window[4]; @Parameter @@ -83,7 +80,7 @@ public class RocksDBSegmentedBytesStoreTest { @Parameters(name = "{0}") public static Object[] getKeySchemas() { - return new Object[]{new SessionKeySchema(), new WindowKeySchema()}; + return new Object[] {new SessionKeySchema(), new WindowKeySchema()}; } @Before @@ -94,29 +91,32 @@ public void before() { windows[0] = new SessionWindow(10, 10); windows[1] = new SessionWindow(500, 1000); windows[2] = new SessionWindow(1000, 1500); - windows[3] = new SessionWindow(30000, 60000); + windows[3] = new SessionWindow(30_000L, 60_000L); } if (schema instanceof WindowKeySchema) { windows[0] = timeWindowForSize(10, windowSizeForTimeWindow); windows[1] = timeWindowForSize(500, windowSizeForTimeWindow); windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow); - windows[3] = timeWindowForSize(60000, windowSizeForTimeWindow); + windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow); } - bytesStore = new RocksDBSegmentedBytesStore(storeName, - retention, - segmentInterval, - schema); + bytesStore = new RocksDBSegmentedBytesStore( + storeName, + retention, + segmentInterval, + schema + ); stateDir = TestUtils.tempDirectory(); context = new InternalMockProcessorContext( - stateDir, - Serdes.String(), - Serdes.Long(), - new NoOpRecordCollector(), - new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); + stateDir, + Serdes.String(), + Serdes.Long(), + new NoOpRecordCollector(), + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())) + ); bytesStore.init(context, bytesStore); } @@ -134,8 +134,10 @@ public void shouldPutAndFetch() { final KeyValueIterator values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500); - final List, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L), - KeyValue.pair(new Windowed<>(key, windows[1]), 50L)); + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(key, windows[0]), 10L), + KeyValue.pair(new Windowed<>(key, windows[1]), 50L) + ); assertEquals(expected, toList(values)); } @@ -147,8 +149,10 @@ public void shouldFindValuesWithinRange() { bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50)); bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100)); final KeyValueIterator results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999); - final List, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L), - KeyValue.pair(new Windowed<>(key, windows[1]), 50L)); + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(key, windows[0]), 10L), + KeyValue.pair(new Windowed<>(key, windows[1]), 50L) + ); assertEquals(expected, toList(results)); } @@ -181,9 +185,14 @@ public void shouldRollSegments() { final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500)); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + assertEquals( + Arrays.asList( + KeyValue.pair(new Windowed<>(key, windows[0]), 50L), KeyValue.pair(new Windowed<>(key, windows[1]), 100L), - KeyValue.pair(new Windowed<>(key, windows[2]), 500L)), results); + KeyValue.pair(new Windowed<>(key, windows[2]), 500L) + ), + results + ); } @@ -198,13 +207,22 @@ public void shouldGetAllSegments() { assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L)); - assertEquals(Utils.mkSet(segments.segmentName(0), - segments.segmentName(1)), segmentDirs()); + assertEquals( + Utils.mkSet( + segments.segmentName(0), + segments.segmentName(1) + ), + segmentDirs() + ); final List, Long>> results = toList(bytesStore.all()); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + assertEquals( + Arrays.asList( + KeyValue.pair(new Windowed<>(key, windows[0]), 50L), KeyValue.pair(new Windowed<>(key, windows[3]), 100L) - ), results); + ), + results + ); } @@ -218,18 +236,27 @@ public void shouldFetchAllSegments() { assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L)); - assertEquals(Utils.mkSet(segments.segmentName(0), - segments.segmentName(1)), segmentDirs()); - - final List, Long>> results = toList(bytesStore.fetchAll(0L, 60000L)); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + assertEquals( + Utils.mkSet( + segments.segmentName(0), + segments.segmentName(1) + ), + segmentDirs() + ); + + final List, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L)); + assertEquals( + Arrays.asList( + KeyValue.pair(new Windowed<>(key, windows[0]), 50L), KeyValue.pair(new Windowed<>(key, windows[3]), 100L) - ), results); + ), + results + ); } @Test - public void shouldLoadSegementsWithOldStyleDateFormattedName() { + public void shouldLoadSegmentsWithOldStyleDateFormattedName() { final Segments segments = new Segments(storeName, retention, segmentInterval); final String key = "a"; @@ -247,20 +274,29 @@ public void shouldLoadSegementsWithOldStyleDateFormattedName() { final File oldStyleName = new File(parent, nameParts[0] + "-" + formatted); assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName)); - bytesStore = new RocksDBSegmentedBytesStore(storeName, - retention, - segmentInterval, - schema); + bytesStore = new RocksDBSegmentedBytesStore( + storeName, + retention, + segmentInterval, + schema + ); bytesStore.init(context, bytesStore); - final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L)); - assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), - KeyValue.pair(new Windowed<>(key, windows[3]), 100L)))); + final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L)); + assertThat( + results, + equalTo( + Arrays.asList( + KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + KeyValue.pair(new Windowed<>(key, windows[3]), 100L) + ) + ) + ); } @Test - public void shouldLoadSegementsWithOldStyleColonFormattedName() { + public void shouldLoadSegmentsWithOldStyleColonFormattedName() { final Segments segments = new Segments(storeName, retention, segmentInterval); final String key = "a"; @@ -274,15 +310,24 @@ public void shouldLoadSegementsWithOldStyleColonFormattedName() { final File oldStyleName = new File(parent, nameParts[0] + ":" + Long.parseLong(nameParts[1])); assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName)); - bytesStore = new RocksDBSegmentedBytesStore(storeName, - retention, - segmentInterval, - schema); + bytesStore = new RocksDBSegmentedBytesStore( + storeName, + retention, + segmentInterval, + schema + ); bytesStore.init(context, bytesStore); - final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L)); - assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L), - KeyValue.pair(new Windowed<>(key, windows[3]), 100L)))); + final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L)); + assertThat( + results, + equalTo( + Arrays.asList( + KeyValue.pair(new Windowed<>(key, windows[0]), 50L), + KeyValue.pair(new Windowed<>(key, windows[3]), 100L) + ) + ) + ); } @@ -304,7 +349,7 @@ public void shouldCreateWriteBatches() { records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[3])).get(), serializeValue(100L))); final Map writeBatchMap = bytesStore.getWriteBatches(records); assertEquals(2, writeBatchMap.size()); - for (final WriteBatch batch: writeBatchMap.values()) { + for (final WriteBatch batch : writeBatchMap.values()) { assertEquals(1, batch.count()); } } @@ -323,7 +368,7 @@ public void shouldRestoreToByteStore() { assertEquals(2, bytesStore.getSegments().size()); // Bulk loading is enabled during recovery. - for (final Segment segment: bytesStore.getSegments()) { + for (final Segment segment : bytesStore.getSegments()) { Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); } @@ -347,20 +392,20 @@ public void shouldRespectBulkLoadOptionsDuringInit() { restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L); - for (final Segment segment: bytesStore.getSegments()) { + for (final Segment segment : bytesStore.getSegments()) { Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); } restoreListener.onRestoreEnd(null, bytesStore.name(), 0L); - for (final Segment segment: bytesStore.getSegments()) { + for (final Segment segment : bytesStore.getSegments()) { Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4)); } } private Set segmentDirs() { - File windowDir = new File(stateDir, storeName); + final File windowDir = new File(stateDir, storeName); - return new HashSet<>(Arrays.asList(windowDir.list())); + return Utils.mkSet(Objects.requireNonNull(windowDir.list())); } private byte[] serializeValue(final long value) { @@ -383,14 +428,14 @@ private List, Long>> toList(final KeyValueIterator next = iterator.next(); if (schema instanceof WindowKeySchema) { final KeyValue, Long> deserialized = KeyValue.pair( - WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow, stateSerdes), - stateSerdes.valueDeserializer().deserialize("dummy", next.value) + WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow, stateSerdes), + stateSerdes.valueDeserializer().deserialize("dummy", next.value) ); results.add(deserialized); } else { final KeyValue, Long> deserialized = KeyValue.pair( - SessionKeySchema.from(next.key.get(), stateSerdes.keyDeserializer(), "dummy"), - stateSerdes.valueDeserializer().deserialize("dummy", next.value) + SessionKeySchema.from(next.key.get(), stateSerdes.keyDeserializer(), "dummy"), + stateSerdes.valueDeserializer().deserialize("dummy", next.value) ); results.add(deserialized); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index ac481a747ba02..b0057e5495f02 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -68,9 +68,9 @@ public class RocksDBWindowStoreTest { private final int numSegments = 3; private final long windowSize = 3L; - private final String windowName = "window"; - private final long segmentInterval = 60_000; + private final long segmentInterval = 600L; private final long retentionPeriod = segmentInterval * (numSegments - 1); + private final String windowName = "window"; private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval); private final StateSerdes serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String()); @@ -145,8 +145,8 @@ public void shouldOnlyIterateOpenSegments() { windowStore.put(1, "four"); // should only have 2 values as the first segment is no longer open - assertEquals(new KeyValue<>(60000L, "two"), iterator.next()); - assertEquals(new KeyValue<>(120000L, "three"), iterator.next()); + assertEquals(new KeyValue<>(segmentInterval, "two"), iterator.next()); + assertEquals(new KeyValue<>(2 * segmentInterval, "three"), iterator.next()); assertFalse(iterator.hasNext()); } @@ -639,7 +639,7 @@ public void testSegmentMaintenance() { segmentDirs(baseDir) ); - setCurrentTime(59999); + setCurrentTime(segmentInterval - 1); windowStore.put(0, "v"); windowStore.put(0, "v"); assertEquals( @@ -647,7 +647,7 @@ public void testSegmentMaintenance() { segmentDirs(baseDir) ); - setCurrentTime(60000); + setCurrentTime(segmentInterval); windowStore.put(0, "v"); assertEquals( Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)), @@ -657,7 +657,7 @@ public void testSegmentMaintenance() { WindowStoreIterator iter; int fetchedCount; - iter = windowStore.fetch(0, 0L, 240000L); + iter = windowStore.fetch(0, 0L, segmentInterval * 4); fetchedCount = 0; while (iter.hasNext()) { iter.next(); @@ -670,10 +670,10 @@ public void testSegmentMaintenance() { segmentDirs(baseDir) ); - setCurrentTime(180000); + setCurrentTime(segmentInterval * 3); windowStore.put(0, "v"); - iter = windowStore.fetch(0, 0L, 240000L); + iter = windowStore.fetch(0, 0L, segmentInterval * 4); fetchedCount = 0; while (iter.hasNext()) { iter.next(); @@ -686,10 +686,10 @@ public void testSegmentMaintenance() { segmentDirs(baseDir) ); - setCurrentTime(300000); + setCurrentTime(segmentInterval * 5); windowStore.put(0, "v"); - iter = windowStore.fetch(0, 240000L, 1000000L); + iter = windowStore.fetch(0, segmentInterval * 4, segmentInterval * 10); fetchedCount = 0; while (iter.hasNext()) { iter.next(); @@ -847,9 +847,9 @@ public void shouldFetchAndIterateOverExactBinaryKeys() { windowStore.init(context, windowStore); - final Bytes key1 = Bytes.wrap(new byte[]{0}); - final Bytes key2 = Bytes.wrap(new byte[]{0, 0}); - final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0}); + final Bytes key1 = Bytes.wrap(new byte[] {0}); + final Bytes key2 = Bytes.wrap(new byte[] {0, 0}); + final Bytes key3 = Bytes.wrap(new byte[] {0, 0, 0}); windowStore.put(key1, "1", 0); windowStore.put(key2, "2", 0); windowStore.put(key3, "3", 0); @@ -924,11 +924,7 @@ private Map> entriesByKey(final List entries = entriesByKey.get(key); - if (entries == null) { - entries = new HashSet<>(); - entriesByKey.put(key, entries); - } + final Set entries = entriesByKey.computeIfAbsent(key, k -> new HashSet<>()); entries.add(value + "@" + (timestamp - startTime)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index 1fc08534b182a..efed24f49e9fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -44,22 +44,24 @@ public class SegmentsTest { private static final int NUM_SEGMENTS = 5; + private static final long SEGMENT_INTERVAL = 100L; + private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL; private InternalMockProcessorContext context; private Segments segments; - private final long segmentInterval = 60_000L; private File stateDirectory; - private String storeName = "test"; - private final int retentionPeriod = 4 * 60 * 1000; + private final String storeName = "test"; @Before public void createContext() { stateDirectory = TestUtils.tempDirectory(); - context = new InternalMockProcessorContext(stateDirectory, - Serdes.String(), - Serdes.Long(), - new NoOpRecordCollector(), - new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); - segments = new Segments(storeName, retentionPeriod, segmentInterval); + context = new InternalMockProcessorContext( + stateDirectory, + Serdes.String(), + Serdes.Long(), + new NoOpRecordCollector(), + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())) + ); + segments = new Segments(storeName, RETENTION_PERIOD, SEGMENT_INTERVAL); } @After @@ -70,24 +72,24 @@ public void close() { @Test public void shouldGetSegmentIdsFromTimestamp() { assertEquals(0, segments.segmentId(0)); - assertEquals(1, segments.segmentId(60000)); - assertEquals(2, segments.segmentId(120000)); - assertEquals(3, segments.segmentId(180000)); + assertEquals(1, segments.segmentId(SEGMENT_INTERVAL)); + assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL)); + assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL)); } @Test public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() { - final Segments segments = new Segments("test", 8 * 60 * 1000, 120_000); + final Segments segments = new Segments("test", 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL); assertEquals(0, segments.segmentId(0)); - assertEquals(0, segments.segmentId(60000)); - assertEquals(1, segments.segmentId(120000)); + assertEquals(0, segments.segmentId(SEGMENT_INTERVAL)); + assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL)); } @Test - public void shouldGetSegmentNameFromId() throws Exception { + public void shouldGetSegmentNameFromId() { assertEquals("test.0", segments.segmentName(0)); - assertEquals("test." + segmentInterval, segments.segmentName(1)); - assertEquals("test." + 2 * segmentInterval, segments.segmentName(2)); + assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1)); + assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2)); } @Test @@ -96,11 +98,11 @@ public void shouldCreateSegments() { final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context); final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context); assertTrue(new File(context.stateDir(), "test/test.0").isDirectory()); - assertTrue(new File(context.stateDir(), "test/test." + segmentInterval).isDirectory()); - assertTrue(new File(context.stateDir(), "test/test." + 2 * segmentInterval).isDirectory()); - assertEquals(true, segment1.isOpen()); - assertEquals(true, segment2.isOpen()); - assertEquals(true, segment3.isOpen()); + assertTrue(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).isDirectory()); + assertTrue(new File(context.stateDir(), "test/test." + 2 * SEGMENT_INTERVAL).isDirectory()); + assertTrue(segment1.isOpen()); + assertTrue(segment2.isOpen()); + assertTrue(segment3.isOpen()); } @Test @@ -114,14 +116,14 @@ public void shouldNotCreateSegmentThatIsAlreadyExpired() { public void shouldCleanupSegmentsThatHaveExpired() { final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context); final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context); - context.setStreamTime(segmentInterval * 7); + context.setStreamTime(SEGMENT_INTERVAL * 7); final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context); assertFalse(segment1.isOpen()); assertFalse(segment2.isOpen()); assertTrue(segment3.isOpen()); assertFalse(new File(context.stateDir(), "test/test.0").exists()); - assertFalse(new File(context.stateDir(), "test/test." + segmentInterval).exists()); - assertTrue(new File(context.stateDir(), "test/test." + 7 * segmentInterval).exists()); + assertFalse(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).exists()); + assertTrue(new File(context.stateDir(), "test/test." + 7 * SEGMENT_INTERVAL).exists()); } @Test @@ -151,6 +153,7 @@ public void shouldCloseAllOpenSegments() { @Test public void shouldOpenExistingSegments() { + segments = new Segments("test", 4, 1); segments.getOrCreateSegmentIfLive(0, context); segments.getOrCreateSegmentIfLive(1, context); segments.getOrCreateSegmentIfLive(2, context); @@ -159,7 +162,7 @@ public void shouldOpenExistingSegments() { // close existing. segments.close(); - segments = new Segments("test", 4 * 60 * 1000, 60_000); + segments = new Segments("test", 4, 1); segments.openExisting(context); assertTrue(segments.getSegmentForTimestamp(0).isOpen()); @@ -182,7 +185,7 @@ public void shouldGetSegmentsWithinTimeRange() { segments.getOrCreateSegmentIfLive(3, context); segments.getOrCreateSegmentIfLive(4, context); - final List segments = this.segments.segments(0, 2 * 60 * 1000); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL); assertEquals(3, segments.size()); assertEquals(0, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -190,14 +193,14 @@ public void shouldGetSegmentsWithinTimeRange() { } @Test - public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception { + public void shouldGetSegmentsWithinTimeRangeOutOfOrder() { updateStreamTimeAndCreateSegment(4); updateStreamTimeAndCreateSegment(2); updateStreamTimeAndCreateSegment(0); updateStreamTimeAndCreateSegment(1); updateStreamTimeAndCreateSegment(3); - final List segments = this.segments.segments(0, 2 * 60 * 1000); + final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL); assertEquals(3, segments.size()); assertEquals(0, segments.get(0).id); assertEquals(1, segments.get(1).id); @@ -241,14 +244,19 @@ public void futureEventsShouldNotCauseSegmentRoll() { } private void updateStreamTimeAndCreateSegment(final int segment) { - context.setStreamTime(segmentInterval * segment); + context.setStreamTime(SEGMENT_INTERVAL * segment); segments.getOrCreateSegmentIfLive(segment, context); } @Test public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception { + final long segmentInterval = 60_000L; // the old segment file's naming system maxes out at 1 minute granularity. + + segments = new Segments(storeName, NUM_SEGMENTS * segmentInterval, segmentInterval); + final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName; final File storeDirectory = new File(storeDirectoryPath); + //noinspection ResultOfMethodCallIgnored storeDirectory.mkdirs(); final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); @@ -256,13 +264,15 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval))); + //noinspection ResultOfMethodCallIgnored oldSegment.createNewFile(); } segments.openExisting(context); for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { - final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1))); + final String segmentName = storeName + "." + (long) segmentId * segmentInterval; + final File newSegment = new File(storeDirectoryPath + File.separator + segmentName); assertTrue(newSegment.exists()); } } @@ -271,17 +281,19 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Exception { final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName; final File storeDirectory = new File(storeDirectoryPath); + //noinspection ResultOfMethodCallIgnored storeDirectory.mkdirs(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { - final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1))); + final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1))); + //noinspection ResultOfMethodCallIgnored oldSegment.createNewFile(); } segments.openExisting(context); for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { - final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1))); + final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1))); assertTrue(newSegment.exists()); } } @@ -292,6 +304,7 @@ public void shouldClearSegmentsOnClose() { segments.close(); assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); } + private void verifyCorrectSegments(final long first, final int numSegments) { final List result = this.segments.segments(0, Long.MAX_VALUE); assertEquals(numSegments, result.size());