Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Remove 1 minute minimum segment interval #5323

Merged
merged 3 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

Expand Down Expand Up @@ -74,8 +72,6 @@
@InterfaceStability.Evolving
public class Stores {

private static final Logger log = LoggerFactory.getLogger(Stores.class);

/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
Expand Down Expand Up @@ -195,7 +191,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* @param segmentInterval size of segments in ms (must be at least one minute)
* @param segmentInterval size of segments in ms (cannot be negative)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
Expand All @@ -206,14 +202,14 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final boolean retainDuplicates,
final long segmentInterval) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0) {
if (retentionPeriod < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSize < 0) {
if (windowSize < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (segmentInterval < 60_000) {
throw new IllegalArgumentException("segmentInterval must be at least one minute");
if (segmentInterval < 1L) {
throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
}

return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() {

@Test(expected = NullPointerException.class)
public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
//noinspection ResultOfMethodCallIgnored
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also included some general cleanup of warnings.

Stores.inMemoryKeyValueStore(null);
}

Expand All @@ -53,12 +54,12 @@ public void shouldThrowIfILruMapStoreCapacityIsNegative() {

@Test(expected = NullPointerException.class)
public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L);
Stores.persistentWindowStore(null, 0L, 0L, false, 0L);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L);
Stores.persistentWindowStore("anyName", -1L, 0L, false, 0L);
}

@Deprecated
Expand All @@ -74,7 +75,7 @@ public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L);
Stores.persistentWindowStore("anyName", 1L, 1L, false, -1L);
}

@Test(expected = NullPointerException.class)
Expand Down Expand Up @@ -129,25 +130,31 @@ public void shouldCreateRocksDbSessionStore() {

@Test
public void shouldBuildWindowStore() {
final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true),
Serdes.String(),
Serdes.String()).build();
final WindowStore<String, String> store = Stores.windowStoreBuilder(
Stores.persistentWindowStore("store", 3L, 3L, true),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}

@Test
public void shouldBuildKeyValueStore() {
final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"),
Serdes.String(),
Serdes.String()).build();
final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("name"),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}

@Test
public void shouldBuildSessionStore() {
final SessionStore<String, String> store = Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10),
Serdes.String(),
Serdes.String()).build();
final SessionStore<String, String> store = Stores.sessionStoreBuilder(
Stores.persistentSessionStore("name", 10),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
Expand All @@ -38,9 +37,12 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;

import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
Expand All @@ -49,14 +51,15 @@
import static org.junit.Assert.assertFalse;


@SuppressWarnings("PointlessArithmeticExpression")
public class CachingSessionStoreTest {

private static final int MAX_CACHE_SIZE_BYTES = 600;
private InternalMockProcessorContext context;
private static final Long DEFAULT_TIMESTAMP = 10L;
private static final long SEGMENT_INTERVAL = 100L;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I searched through the code base to find all the spots where we set the segment interval to 1 minute just because it was the min. I've set them all lower (typically to 100ms, because it's a nice round number) mostly to avoid implying that there's anything special about 60s anymore.

private RocksDBSegmentedBytesStore underlying;
private CachingSessionStore<String, String> cachingStore;
private ThreadCache cache;
private static final Long DEFAULT_TIMESTAMP = 10L;
private final Bytes keyA = Bytes.wrap("a".getBytes());
private final Bytes keyAA = Bytes.wrap("aa".getBytes());
private final Bytes keyB = Bytes.wrap("b".getBytes());
Expand All @@ -65,17 +68,11 @@ public class CachingSessionStoreTest {
public void setUp() {
final SessionKeySchema schema = new SessionKeySchema();
schema.init("topic");
final int retention = 60000;
final int segmentInterval = 60_000;
underlying = new RocksDBSegmentedBytesStore("test", retention, segmentInterval, schema);
underlying = new RocksDBSegmentedBytesStore("test", 0L, SEGMENT_INTERVAL, schema);
final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
cachingStore = new CachingSessionStore<>(sessionStore,
Serdes.String(),
Serdes.String(),
segmentInterval
);
cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null));
cachingStore.init(context, cachingStore);
}
Expand Down Expand Up @@ -134,11 +131,13 @@ public void shouldPutFetchRangeFromCache() {

@Test
public void shouldFetchAllSessionsWithSameRecordKey() {
final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()));
for (KeyValue<Windowed<Bytes>, byte[]> kv : expected) {
final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()),
KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes())
);
for (final KeyValue<Windowed<Bytes>, byte[]> kv : expected) {
cachingStore.put(kv.key, kv.value);
}

Expand Down Expand Up @@ -184,14 +183,14 @@ public void shouldRemove() {

@Test
public void shouldFetchCorrectlyAcrossSegments() {
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000));
final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(120_000, 120_000));
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all the places where the test logic actually depends on the segment interval, I've expanded the expressions to include it. Theoretically, we should be able to plug in any number for the segment interval, and the tests should still pass.

cachingStore.put(a1, "1".getBytes());
cachingStore.put(a2, "2".getBytes());
cachingStore.put(a3, "3".getBytes());
cachingStore.flush();
final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, 60_000 * 2);
final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2);
assertEquals(a1, results.next().key);
assertEquals(a2, results.next().key);
assertEquals(a3, results.next().key);
Expand All @@ -200,64 +199,65 @@ public void shouldFetchCorrectlyAcrossSegments() {

@Test
public void shouldFetchRangeCorrectlyAcrossSegments() {
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(0, 0));
final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000));
final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(60_000 * 2, 60_000 * 2));
final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(60_000 * 2, 60_000 * 2));
final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
cachingStore.put(a1, "1".getBytes());
cachingStore.put(aa1, "1".getBytes());
cachingStore.put(a2, "2".getBytes());
cachingStore.put(a3, "3".getBytes());
cachingStore.put(aa3, "3".getBytes());
cachingStore.flush();

final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, 60_000 * 2);
assertEquals(a1, rangeResults.next().key);
assertEquals(aa1, rangeResults.next().key);
assertEquals(a2, rangeResults.next().key);
assertEquals(a3, rangeResults.next().key);
assertEquals(aa3, rangeResults.next().key);
assertFalse(rangeResults.hasNext());
final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2);
final Set<Windowed<Bytes>> keys = new HashSet<>();
while (rangeResults.hasNext()) {
keys.add(rangeResults.next().key);
}
rangeResults.close();
assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);
}

@Test
public void shouldForwardChangedValuesDuringFlush() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
}, true);

cachingStore.setFlushListener(
(key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
true
);

cachingStore.put(a, "1".getBytes());
cachingStore.flush();

cachingStore.put(a, "2".getBytes());
cachingStore.flush();

cachingStore.remove(a);
cachingStore.flush();

assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", "1")),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
assertEquals(
flushed,
Arrays.asList(
KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", "1")),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))
)
);
}

@Test
public void shouldForwardChangedValuesDuringFlushWhenSendOldValuesDisabledNewRecordIsNull() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
}, false);
cachingStore.setFlushListener(
(key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
false
);

cachingStore.put(a, "1".getBytes());
cachingStore.flush();
Expand All @@ -268,22 +268,25 @@ public void apply(final Windowed<String> key, final String newValue, final Strin
cachingStore.remove(a);
cachingStore.flush();

assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null)),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
assertEquals(
flushed,
Arrays.asList(
KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null)),
KeyValue.pair(aDeserialized, new Change<>(null, "2"))
)
);
}

@Test
public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
}, false);
cachingStore.setFlushListener(
(key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
false
);

cachingStore.put(a, "1".getBytes());
cachingStore.flush();
Expand All @@ -292,8 +295,13 @@ public void apply(final Windowed<String> key, final String newValue, final Strin
cachingStore.flush();


assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null))));
assertEquals(
flushed,
Arrays.asList(
KeyValue.pair(aDeserialized, new Change<>("1", null)),
KeyValue.pair(aDeserialized, new Change<>("2", null))
)
);
}

@Test
Expand Down Expand Up @@ -369,7 +377,7 @@ public void shouldThrowNullPointerExceptionOnPutNullKey() {
cachingStore.put(null, "1".getBytes());
}

private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(final String...sessionIds) {
private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(final String... sessionIds) {
final Random random = new Random();
final List<KeyValue<Windowed<Bytes>, byte[]>> results = new ArrayList<>();
while (cache.size() == results.size()) {
Expand Down
Loading