Skip to content

Commit

Permalink
MINOR: Remove 1 minute minimum segment interval (#5323)
Browse files Browse the repository at this point in the history
* new minimum is 0, just like window size
* refactor tests to use smaller segment sizes as well

Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
vvcephei authored and guozhangwang committed Aug 1, 2018
1 parent e09d6d7 commit 814fbe0
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 240 deletions.
14 changes: 5 additions & 9 deletions streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

Expand Down Expand Up @@ -74,8 +72,6 @@
@InterfaceStability.Evolving
public class Stores {

private static final Logger log = LoggerFactory.getLogger(Stores.class);

/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
Expand Down Expand Up @@ -195,7 +191,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* @param segmentInterval size of segments in ms (must be at least one minute)
* @param segmentInterval size of segments in ms (cannot be negative)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
Expand All @@ -206,14 +202,14 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final boolean retainDuplicates,
final long segmentInterval) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0) {
if (retentionPeriod < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSize < 0) {
if (windowSize < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (segmentInterval < 60_000) {
throw new IllegalArgumentException("segmentInterval must be at least one minute");
if (segmentInterval < 1L) {
throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
}

return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() {

@Test(expected = NullPointerException.class)
public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
//noinspection ResultOfMethodCallIgnored
Stores.inMemoryKeyValueStore(null);
}

Expand All @@ -53,12 +54,12 @@ public void shouldThrowIfILruMapStoreCapacityIsNegative() {

@Test(expected = NullPointerException.class)
public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L);
Stores.persistentWindowStore(null, 0L, 0L, false, 0L);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L);
Stores.persistentWindowStore("anyName", -1L, 0L, false, 0L);
}

@Deprecated
Expand All @@ -74,7 +75,7 @@ public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L);
Stores.persistentWindowStore("anyName", 1L, 1L, false, -1L);
}

@Test(expected = NullPointerException.class)
Expand Down Expand Up @@ -129,25 +130,31 @@ public void shouldCreateRocksDbSessionStore() {

@Test
public void shouldBuildWindowStore() {
final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true),
Serdes.String(),
Serdes.String()).build();
final WindowStore<String, String> store = Stores.windowStoreBuilder(
Stores.persistentWindowStore("store", 3L, 3L, true),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}

@Test
public void shouldBuildKeyValueStore() {
final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"),
Serdes.String(),
Serdes.String()).build();
final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("name"),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}

@Test
public void shouldBuildSessionStore() {
final SessionStore<String, String> store = Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10),
Serdes.String(),
Serdes.String()).build();
final SessionStore<String, String> store = Stores.sessionStoreBuilder(
Stores.persistentSessionStore("name", 10),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
Expand All @@ -38,9 +37,12 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;

import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
Expand All @@ -49,14 +51,15 @@
import static org.junit.Assert.assertFalse;


@SuppressWarnings("PointlessArithmeticExpression")
public class CachingSessionStoreTest {

private static final int MAX_CACHE_SIZE_BYTES = 600;
private InternalMockProcessorContext context;
private static final Long DEFAULT_TIMESTAMP = 10L;
private static final long SEGMENT_INTERVAL = 100L;
private RocksDBSegmentedBytesStore underlying;
private CachingSessionStore<String, String> cachingStore;
private ThreadCache cache;
private static final Long DEFAULT_TIMESTAMP = 10L;
private final Bytes keyA = Bytes.wrap("a".getBytes());
private final Bytes keyAA = Bytes.wrap("aa".getBytes());
private final Bytes keyB = Bytes.wrap("b".getBytes());
Expand All @@ -65,17 +68,11 @@ public class CachingSessionStoreTest {
public void setUp() {
final SessionKeySchema schema = new SessionKeySchema();
schema.init("topic");
final int retention = 60000;
final int segmentInterval = 60_000;
underlying = new RocksDBSegmentedBytesStore("test", retention, segmentInterval, schema);
underlying = new RocksDBSegmentedBytesStore("test", 0L, SEGMENT_INTERVAL, schema);
final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
cachingStore = new CachingSessionStore<>(sessionStore,
Serdes.String(),
Serdes.String(),
segmentInterval
);
cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null));
cachingStore.init(context, cachingStore);
}
Expand Down Expand Up @@ -134,11 +131,13 @@ public void shouldPutFetchRangeFromCache() {

@Test
public void shouldFetchAllSessionsWithSameRecordKey() {
final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()));
for (KeyValue<Windowed<Bytes>, byte[]> kv : expected) {
final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes())
);
for (final KeyValue<Windowed<Bytes>, byte[]> kv : expected) {
cachingStore.put(kv.key, kv.value);
}

Expand Down Expand Up @@ -184,14 +183,14 @@ public void shouldRemove() {

@Test
public void shouldFetchCorrectlyAcrossSegments() {
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000));
final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(120_000, 120_000));
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
cachingStore.put(a1, "1".getBytes());
cachingStore.put(a2, "2".getBytes());
cachingStore.put(a3, "3".getBytes());
cachingStore.flush();
final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, 60_000 * 2);
final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2);
assertEquals(a1, results.next().key);
assertEquals(a2, results.next().key);
assertEquals(a3, results.next().key);
Expand All @@ -200,64 +199,65 @@ public void shouldFetchCorrectlyAcrossSegments() {

@Test
public void shouldFetchRangeCorrectlyAcrossSegments() {
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(0, 0));
final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000));
final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(60_000 * 2, 60_000 * 2));
final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(60_000 * 2, 60_000 * 2));
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
cachingStore.put(a1, "1".getBytes());
cachingStore.put(aa1, "1".getBytes());
cachingStore.put(a2, "2".getBytes());
cachingStore.put(a3, "3".getBytes());
cachingStore.put(aa3, "3".getBytes());
cachingStore.flush();

final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, 60_000 * 2);
assertEquals(a1, rangeResults.next().key);
assertEquals(aa1, rangeResults.next().key);
assertEquals(a2, rangeResults.next().key);
assertEquals(a3, rangeResults.next().key);
assertEquals(aa3, rangeResults.next().key);
assertFalse(rangeResults.hasNext());
final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2);
final Set<Windowed<Bytes>> keys = new HashSet<>();
while (rangeResults.hasNext()) {
keys.add(rangeResults.next().key);
}
rangeResults.close();
assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);
}

@Test
public void shouldForwardChangedValuesDuringFlush() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
}, true);

cachingStore.setFlushListener(
(key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
true
);

cachingStore.put(a, "1".getBytes());
cachingStore.flush();

cachingStore.put(a, "2".getBytes());
cachingStore.flush();

cachingStore.remove(a);
cachingStore.flush();

assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", "1")),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
assertEquals(
flushed,
Arrays.asList(
KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", "1")),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))
)
);
}

@Test
public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRecordIsNull() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
}, false);
cachingStore.setFlushListener(
(key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
false
);

cachingStore.put(a, "1".getBytes());
cachingStore.flush();
Expand All @@ -268,22 +268,25 @@ public void apply(final Windowed<String> key, final String newValue, final Strin
cachingStore.remove(a);
cachingStore.flush();

assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null)),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
assertEquals(
flushed,
Arrays.asList(
KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null)),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))
)
);
}

@Test
public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
}, false);
cachingStore.setFlushListener(
(key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
false
);

cachingStore.put(a, "1".getBytes());
cachingStore.flush();
Expand All @@ -292,8 +295,13 @@ public void apply(final Windowed<String> key, final String newValue, final Strin
cachingStore.flush();


assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null))));
assertEquals(
flushed,
Arrays.asList(
KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null))
)
);
}

@Test
Expand Down Expand Up @@ -369,7 +377,7 @@ public void shouldThrowNullPointerExceptionOnPutNullKey() {
cachingStore.put(null, "1".getBytes());
}

private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(final String...sessionIds) {
private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(final String... sessionIds) {
final Random random = new Random();
final List<KeyValue<Windowed<Bytes>, byte[]>> results = new ArrayList<>();
while (cache.size() == results.size()) {
Expand Down
Loading

0 comments on commit 814fbe0

Please sign in to comment.