From 7991605d84d9e84e39587443bcc65b82f3326271 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 19 Jun 2020 16:20:52 +0900 Subject: [PATCH 01/18] Add omitted [WindowStoreBuilderTest, TimestampedWindowStoreBuilderTest]#shouldThrowNullPointerIfMetricsScopeIsNull: Other StateStores has it. --- .../state/internals/TimestampedWindowStoreBuilderTest.java | 5 +++++ .../streams/state/internals/WindowStoreBuilderTest.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java index 539df95d73f2..1fae9bc244e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java @@ -200,4 +200,9 @@ public void shouldThrowNullPointerIfTimeIsNull() { assertThrows(NullPointerException.class, () -> new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null)); } + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfMetricsScopeIsNull() { + new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } + } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index ed43c4a9dfbd..b4c96192765e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -154,4 +154,9 @@ public void shouldThrowNullPointerIfTimeIsNull() { new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); } + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfMetricsScopeIsNull() { + new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } + } \ No newline at end of file From 7cfe008fbd517a20e4c28aa2de6c84692bea0bc7 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 19 Jun 2020 20:47:14 +0900 Subject: [PATCH 02/18] Improve Stores Javadoc 1. Fix javadoc typo: Stores#timestampedWindowStoreBuilder, windows-store -> window-store (Compare with Stores#timestampedKeyValueStoreBuilder) 2. Add omitted IllegalArgumentException condition in Stores#lruMap. 3. Add omitted IllegalArgumentException condition in WindowBytesStoreSupplier methods. --- .../main/java/org/apache/kafka/streams/state/Stores.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 066d499e5ffc..c9864506877c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -150,6 +150,7 @@ public String metricsScope() { * @param maxCacheSize maximum number of items in the LRU (cannot be negative) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build * an LRU Map based store + * @throws IllegalArgumentException if {@code maxCacheSize} is negative */ public static KeyValueBytesStoreSupplier lruMap(final String name, final int maxCacheSize) { Objects.requireNonNull(name, "name cannot be null"); @@ -231,6 +232,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} + * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentWindowStore(final String name, final Duration retentionPeriod, @@ -257,6 +259,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} + * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name, final Duration retentionPeriod, @@ -328,6 +331,7 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name, * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} + * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier inMemoryWindowStore(final String name, final Duration retentionPeriod, @@ -481,7 +485,7 @@ public static StoreBuilder> windowStoreBuilder(final Wi *

* The provided supplier should not be a supplier for * {@link WindowStore WindowStores}. For this case, passed in timestamps will be dropped and not stored in the - * windows-store. On read, no valid timestamp but a dummy timestamp will be returned. + * window-store. On read, no valid timestamp but a dummy timestamp will be returned. * * @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null}) * @param keySerde the key serde to use From 3f9f171610fa2e13c10387b1765924f91fa992c4 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 19 Jun 2020 20:48:48 +0900 Subject: [PATCH 03/18] Remove unused method + duplicated parameters 1. Remove unused method: TimeWindowedSerde#timeWindowedSerdeFrom(Class, long) 2. Remove duplicated parameter: Log#splitOverflowedSegment --- core/src/main/scala/kafka/log/Log.scala | 2 +- .../org/apache/kafka/streams/kstream/WindowedSerdes.java | 8 -------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index ad4bd6e4bb37..f469727ce8e4 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2447,7 +2447,7 @@ class Log(@volatile private var _dir: File, // replace old segment with new ones info(s"Replacing overflowed segment $segment with split segments $newSegments") - replaceSegments(newSegments.toList, List(segment), isRecoveredSwapFile = false) + replaceSegments(newSegments.toList, List(segment)) newSegments.toList } catch { case e: Exception => diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java index 07beb5a2a2f5..06ce7b719f94 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java @@ -64,14 +64,6 @@ static public Serde> timeWindowedSerdeFrom(final Class type) return new TimeWindowedSerde<>(Serdes.serdeFrom(type)); } - /** - * Construct a {@code TimeWindowedSerde} object to deserialize changelog topic - * for the specified inner class type and window size. - */ - static public Serde> timeWindowedSerdeFrom(final Class type, final long windowSize) { - return new TimeWindowedSerde<>(Serdes.serdeFrom(type), windowSize); - } - /** * Construct a {@code SessionWindowedSerde} object for the specified inner class type. */ From 9bdc180ef6041be375c34cc4644904cb5e0995a2 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 19 Jun 2020 20:57:01 +0900 Subject: [PATCH 04/18] Remove unthrown Exceptions: TaskManager, ProduceBenchWorker --- .../org/apache/kafka/trogdor/coordinator/TaskManager.java | 4 ++-- .../org/apache/kafka/trogdor/workload/ProduceBenchWorker.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 69d2ddae7637..a307e595a870 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -671,7 +671,7 @@ public TaskState call() throws Exception { /** * Initiate shutdown, but do not wait for it to complete. */ - public void beginShutdown(boolean stopAgents) throws ExecutionException, InterruptedException { + public void beginShutdown(boolean stopAgents) { if (shutdown.compareAndSet(false, true)) { executor.submit(new Shutdown(stopAgents)); } @@ -680,7 +680,7 @@ public void beginShutdown(boolean stopAgents) throws ExecutionException, Interru /** * Wait for shutdown to complete. May be called prior to beginShutdown. */ - public void waitForShutdown() throws ExecutionException, InterruptedException { + public void waitForShutdown() throws InterruptedException { while (!executor.awaitTermination(1, TimeUnit.DAYS)) { } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index 4b262eff8b54..42127fdb3463 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -79,7 +79,7 @@ public ProduceBenchWorker(String id, ProduceBenchSpec spec) { @Override public void start(Platform platform, WorkerStatusTracker status, - KafkaFutureImpl doneFuture) throws Exception { + KafkaFutureImpl doneFuture) { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("ProducerBenchWorker is already running."); } From bede35cef55008e089f8ea5b95c3aef4824dd2a8 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 19 Jun 2020 20:57:30 +0900 Subject: [PATCH 05/18] Remove unnecessary public from [Sink,Source]TaskContext --- .../java/org/apache/kafka/connect/sink/SinkTaskContext.java | 2 +- .../java/org/apache/kafka/connect/source/SourceTaskContext.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java index 33c82cb4a546..c4522c796286 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java @@ -33,7 +33,7 @@ public interface SinkTaskContext { * and the configuration is using variable references such as those compatible with * {@link org.apache.kafka.common.config.ConfigTransformer}. */ - public Map configs(); + Map configs(); /** * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java index 2e87986648f5..ddb0a7871835 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java @@ -32,7 +32,7 @@ public interface SourceTaskContext { * and the configuration is using variable references such as those compatible with * {@link org.apache.kafka.common.config.ConfigTransformer}. */ - public Map configs(); + Map configs(); /** * Get the OffsetStorageReader for this SourceTask. From 1f895311913681a3bec079c55b415ae0a50cbd70 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 19 Jun 2020 21:01:13 +0900 Subject: [PATCH 06/18] Fix incorrect MeteredTimestampedWindowStore javadoc --- .../streams/state/internals/MeteredTimestampedWindowStore.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java index 3c386f3f7db6..f61ebd47a95e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java @@ -28,10 +28,11 @@ import org.apache.kafka.streams.state.WindowStore; /** - * A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its + * A Metered {@link TimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its * inner WindowStore implementation do not need to provide its own metrics collecting functionality. * The inner {@link WindowStore} of this class is of type <Bytes,byte[]>, hence we use {@link Serde}s * to convert from <K,ValueAndTimestamp<V>> to <Bytes,byte[]> + * * @param * @param */ From b9525c50c525c8fa2382b81ec9c53661a2ab1580 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 19 Jun 2020 21:05:36 +0900 Subject: [PATCH 07/18] Remove duplicated entity in checkstyle/import-control.xml --- checkstyle/import-control.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 25414f30f161..c68a49ab3459 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -291,7 +291,6 @@ - From ada16b49c61068e0d80e057fdada1c5dbf205452 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 19 Jun 2020 21:15:14 +0900 Subject: [PATCH 08/18] Add omitted WindowStore, SessionStore test cases: GlobalStateStoreProviderTest, StreamThreadStateStoreProviderTest (Note: compare with KeyValueStore cases.) --- .../internals/ProcessorRecordContext.java | 1 - .../GlobalStateStoreProviderTest.java | 40 +++++++++++++++++++ .../StreamThreadStateStoreProviderTest.java | 27 +++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 2e979ef0188c..7eb4dc859513 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -101,7 +101,6 @@ public byte[] serialize() { final byte[][] headerKeysBytes; final byte[][] headerValuesBytes; - int size = 0; size += Long.BYTES; // value.context.timestamp size += Long.BYTES; // value.context.offset diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index fd427e3439fc..851bb66ee170 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -90,6 +91,14 @@ public void before() { false), Serdes.String(), Serdes.String()).build()); + stores.put( + "s-store", + Stores.sessionStoreBuilder( + Stores.inMemorySessionStore( + "s-store", + Duration.ofMillis(10L)), + Serdes.String(), + Serdes.String()).build()); final ProcessorContextImpl mockContext = niceMock(ProcessorContextImpl.class); expect(mockContext.applicationId()).andStubReturn("appId"); @@ -175,6 +184,26 @@ public void shouldReturnTimestampedKeyValueStoreAsKeyValueStore() { } } + @Test + public void shouldReturnWindowStore() { + final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores); + final List> stores = + provider.stores("w-store", QueryableStoreTypes.windowStore()); + assertEquals(1, stores.size()); + for (final ReadOnlyWindowStore store : stores) { + assertThat(store, instanceOf(ReadOnlyWindowStore.class)); + assertThat(store, not(instanceOf(TimestampedWindowStore.class))); + } + } + + @Test + public void shouldNotReturnWindowStoreAsTimestampedStore() { + final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores); + final List>> stores = + provider.stores("w-store", QueryableStoreTypes.timestampedWindowStore()); + assertEquals(0, stores.size()); + } + @Test public void shouldReturnTimestampedWindowStoreAsWindowStore() { final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores); @@ -186,4 +215,15 @@ public void shouldReturnTimestampedWindowStoreAsWindowStore() { assertThat(store, not(instanceOf(TimestampedWindowStore.class))); } } + + @Test + public void shouldReturnSessionStore() { + final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores); + final List> stores = + provider.stores("s-store", QueryableStoreTypes.sessionStore()); + assertEquals(1, stores.size()); + for (final ReadOnlySessionStore store : stores) { + assertThat(store, instanceOf(ReadOnlySessionStore.class)); + } + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index db827e873062..0ac170a86f8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -46,6 +46,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -125,6 +126,14 @@ public void before() { Serdes.String(), Serdes.String()), "the-processor"); + topology.addStateStore( + Stores.sessionStoreBuilder( + Stores.inMemorySessionStore( + "session-store", + Duration.ofMillis(10L)), + Serdes.String(), + Serdes.String()), + "the-processor"); final Properties properties = new Properties(); final String applicationId = "applicationId"; @@ -258,6 +267,17 @@ public void shouldFindTimestampedWindowStoresAsWindowStore() { } } + @Test + public void shouldFindSessionStores() { + mockThread(true); + final List> sessionStores = + provider.stores(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.sessionStore())); + assertEquals(2, sessionStores.size()); + for (final ReadOnlySessionStore store: sessionStores) { + assertThat(store, instanceOf(ReadOnlySessionStore.class)); + } + } + @Test(expected = InvalidStateStoreException.class) public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() { mockThread(true); @@ -286,6 +306,13 @@ public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() { provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore())); } + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowInvalidStoreExceptionIfSessionStoreClosed() { + mockThread(true); + taskOne.getStore("session-store").close(); + provider.stores(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.sessionStore())); + } + @Test public void shouldReturnEmptyListIfNoStoresFoundWithName() { mockThread(true); From d581d152530d43dd2f15684f1a766ee34afb572a Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 20 Jun 2020 20:14:44 +0900 Subject: [PATCH 09/18] Update [KeyValue,TimestampedKeyValue,WindowStore,TimestampedWindowStore]BuilderTest#shouldThrowNullPointerIfMetricsScopeIsNull: now checks Exception message. --- .../state/internals/KeyValueStoreBuilderTest.java | 12 ++++++++++-- .../state/internals/SessionStoreBuilderTest.java | 7 ++++++- .../TimestampedKeyValueStoreBuilderTest.java | 11 +++++++++-- .../TimestampedWindowStoreBuilderTest.java | 11 +++++++++-- .../state/internals/WindowStoreBuilderTest.java | 13 ++++++++++--- 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java index 1f4384a8c708..42d0ae1c4817 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java @@ -35,7 +35,9 @@ import java.util.Collections; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class KeyValueStoreBuilderTest { @@ -134,9 +136,15 @@ public void shouldThrowNullPointerIfTimeIsNull() { new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + final Exception e = assertThrows(NullPointerException.class, + () -> new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + /* + * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since + * KeyValueStoreBuilder omits the MetricsScope nullity check in its constructor. + */ + assertThat(e.getMessage(), equalTo("name cannot be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java index 3e3c1a7f9a49..6fcaa1120edd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -146,7 +146,12 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + final Exception e = assertThrows(NullPointerException.class, + () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + /* + * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since + * SessionStoreBuilder omits the MetricsScope nullity check in its constructor. + */ assertThat(e.getMessage(), equalTo("name cannot be null")); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java index c3ad6b329b44..fd392f844c34 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java @@ -36,6 +36,7 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertThrows; @@ -168,7 +169,13 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - assertThrows(NullPointerException.class, () -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + final Exception e = assertThrows(NullPointerException.class, + () -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + /* + * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since + * TimestampedKeyValueStoreBuilder omits the MetricsScope nullity check in its constructor. + */ + assertThat(e.getMessage(), equalTo("name cannot be null")); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java index 1fae9bc244e6..b60797d50f3b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java @@ -39,6 +39,7 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -200,9 +201,15 @@ public void shouldThrowNullPointerIfTimeIsNull() { assertThrows(NullPointerException.class, () -> new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + final Exception e = assertThrows(NullPointerException.class, + () -> new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + /* + * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since + * TimestampedWindowStoreBuilder omits the MetricsScope nullity check in its constructor. + */ + assertThat(e.getMessage(), equalTo("name cannot be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index b4c96192765e..7874f8f45642 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -39,8 +39,10 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class WindowStoreBuilderTest { @@ -154,9 +156,14 @@ public void shouldThrowNullPointerIfTimeIsNull() { new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + final Exception e = assertThrows(NullPointerException.class, + () -> new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + /* + * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since + * WindowStoreBuilder omits the MetricsScope nullity check in its constructor. + */ + assertThat(e.getMessage(), equalTo("name cannot be null")); } - } \ No newline at end of file From fc7194d838ce746a310d84be22f3f15a8f6640ce Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 20 Jun 2020 21:15:30 +0900 Subject: [PATCH 10/18] Give BytesStoreSupplier mock with name to [KeyValue,TimestampedKeyValue,WindowStore,TimestampedWindowStore]BuilderTest#shouldThrowNullPointerIfMetricsScopeIsNull; No exceptions are thrown now. --- .../internals/KeyValueStoreBuilderTest.java | 23 +++++++++------ .../internals/SessionStoreBuilderTest.java | 25 +++++++++++----- .../TimestampedKeyValueStoreBuilderTest.java | 22 +++++++------- .../TimestampedWindowStoreBuilderTest.java | 28 ++++++++++++------ .../internals/WindowStoreBuilderTest.java | 29 +++++++++++++------ 5 files changed, 83 insertions(+), 44 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java index 42d0ae1c4817..8539fdb09364 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java @@ -34,10 +34,12 @@ import java.util.Collections; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class KeyValueStoreBuilderTest { @@ -138,13 +140,16 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - final Exception e = assertThrows(NullPointerException.class, - () -> new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - /* - * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since - * KeyValueStoreBuilder omits the MetricsScope nullity check in its constructor. - */ - assertThat(e.getMessage(), equalTo("name cannot be null")); + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBStore("name", null)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + try { + new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } catch (final Exception e) { + fail(); + } } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java index 6fcaa1120edd..1da4daa91b3f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class SessionStoreBuilderTest { @@ -146,13 +147,23 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - final Exception e = assertThrows(NullPointerException.class, - () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - /* - * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since - * SessionStoreBuilder omits the MetricsScope nullity check in its constructor. - */ - assertThat(e.getMessage(), equalTo("name cannot be null")); + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBSessionStore( + new RocksDBSegmentedBytesStore( + "name", + null, + 10L, + 5L, + new SessionKeySchema()) + )); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + try { + new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } catch (final Exception e) { + fail(); + } } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java index fd392f844c34..4247cbbbc67b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java @@ -36,9 +36,8 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class TimestampedKeyValueStoreBuilderTest { @@ -169,13 +168,16 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - final Exception e = assertThrows(NullPointerException.class, - () -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - /* - * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since - * TimestampedKeyValueStoreBuilder omits the MetricsScope nullity check in its constructor. - */ - assertThat(e.getMessage(), equalTo("name cannot be null")); + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBTimestampedStore("name", null)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + try { + new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } catch (final Exception e) { + fail(); + } } -} +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java index b60797d50f3b..b6793222c46d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java @@ -39,10 +39,9 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class TimestampedWindowStoreBuilderTest { @@ -203,13 +202,24 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - final Exception e = assertThrows(NullPointerException.class, - () -> new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - /* - * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since - * TimestampedWindowStoreBuilder omits the MetricsScope nullity check in its constructor. - */ - assertThat(e.getMessage(), equalTo("name cannot be null")); + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBTimestampedWindowStore( + new RocksDBTimestampedSegmentedBytesStore( + "name", + null, + 10L, + 5L, + new WindowKeySchema()), + false, + 1L)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + try { + new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } catch (final Exception e) { + fail(); + } } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index 7874f8f45642..601df8de2da1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -38,11 +38,11 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class WindowStoreBuilderTest { @@ -158,12 +158,23 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - final Exception e = assertThrows(NullPointerException.class, - () -> new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - /* - * TODO: The exception is thrown from the constructor of AbstractStoreBuilder, since - * WindowStoreBuilder omits the MetricsScope nullity check in its constructor. - */ - assertThat(e.getMessage(), equalTo("name cannot be null")); + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBWindowStore( + new RocksDBSegmentedBytesStore( + "name", + null, + 10L, + 5L, + new WindowKeySchema()), + false, + 1L)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + try { + new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + } catch (final Exception e) { + fail(); + } } } \ No newline at end of file From df01c352d84fe1364e75978292c2b54500dc56b0 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 20 Jun 2020 21:38:13 +0900 Subject: [PATCH 11/18] Add metricsScope nullity check to [KeyValue,TimestampedKeyValue,Window,TimestampedWindow,Session]StoreBuilder constructors --- .../state/internals/KeyValueStoreBuilder.java | 1 + .../state/internals/SessionStoreBuilder.java | 1 + .../internals/TimestampedKeyValueStoreBuilder.java | 1 + .../internals/TimestampedWindowStoreBuilder.java | 1 + .../streams/state/internals/WindowStoreBuilder.java | 3 +++ .../state/internals/KeyValueStoreBuilderTest.java | 12 ++++++------ .../state/internals/SessionStoreBuilderTest.java | 10 ++++------ .../TimestampedKeyValueStoreBuilderTest.java | 12 ++++++------ .../TimestampedWindowStoreBuilderTest.java | 13 ++++++------- .../state/internals/WindowStoreBuilderTest.java | 12 ++++++------ 10 files changed, 35 insertions(+), 31 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java index 2071ca786292..cc641ced60ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java @@ -34,6 +34,7 @@ public KeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier, final Time time) { super(storeSupplier.name(), keySerde, valueSerde, time); Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "bytesStoreSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java index 51ef319da94e..6010433571b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java @@ -34,6 +34,7 @@ public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier, final Serde valueSerde, final Time time) { super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time); + Objects.requireNonNull(storeSupplier.metricsScope(), "supplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index be8f25936684..fbad4fb07934 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -47,6 +47,7 @@ public TimestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSup valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time); Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "bytesStoreSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index a54426216c24..e3fa1b0ea494 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -47,6 +47,7 @@ public TimestampedWindowStoreBuilder(final WindowBytesStoreSupplier storeSupplie final Time time) { super(storeSupplier.name(), keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time); Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "bytesStoreSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java index 722564572093..208ddc39a7f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -24,6 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; + public class WindowStoreBuilder extends AbstractStoreBuilder> { private final Logger log = LoggerFactory.getLogger(WindowStoreBuilder.class); @@ -34,6 +36,7 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, final Serde valueSerde, final Time time) { super(storeSupplier.name(), keySerde, valueSerde, time); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java index 8539fdb09364..a766325a6ac2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java @@ -38,8 +38,9 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class KeyValueStoreBuilderTest { @@ -54,6 +55,7 @@ public class KeyValueStoreBuilderTest { public void setUp() { EasyMock.expect(supplier.get()).andReturn(inner); EasyMock.expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); EasyMock.replay(supplier); builder = new KeyValueStoreBuilder<>( supplier, @@ -145,11 +147,9 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { expect(supplier.name()).andReturn("name"); replay(supplier); - try { - new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); - } catch (final Exception e) { - fail(); - } + final Exception e = assertThrows(NullPointerException.class, + () -> new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("bytesStoreSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java index 1da4daa91b3f..c2379d3cd344 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -40,7 +40,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; @RunWith(EasyMockRunner.class) public class SessionStoreBuilderTest { @@ -56,6 +55,7 @@ public void setUp() throws Exception { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); replay(supplier); builder = new SessionStoreBuilder<>( @@ -159,11 +159,9 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { expect(supplier.name()).andReturn("name"); replay(supplier); - try { - new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); - } catch (final Exception e) { - fail(); - } + final Exception e = assertThrows(NullPointerException.class, + () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("supplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java index 4247cbbbc67b..bfa6acf407c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java @@ -36,8 +36,9 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class TimestampedKeyValueStoreBuilderTest { @@ -52,6 +53,7 @@ public class TimestampedKeyValueStoreBuilderTest { public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); expect(inner.persistent()).andReturn(true).anyTimes(); replay(supplier, inner); @@ -173,11 +175,9 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { expect(supplier.name()).andReturn("name"); replay(supplier); - try { - new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); - } catch (final Exception e) { - fail(); - } + final Exception e = assertThrows(NullPointerException.class, + () -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("bytesStoreSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java index b6793222c46d..896076932817 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java @@ -39,9 +39,10 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class TimestampedWindowStoreBuilderTest { @@ -56,6 +57,7 @@ public class TimestampedWindowStoreBuilderTest { public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); expect(inner.persistent()).andReturn(true).anyTimes(); replay(supplier, inner); @@ -214,12 +216,9 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { 1L)); expect(supplier.name()).andReturn("name"); replay(supplier); - - try { - new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); - } catch (final Exception e) { - fail(); - } + final Exception e = assertThrows(NullPointerException.class, + () -> new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("bytesStoreSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index 601df8de2da1..e8dc286a0b19 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -40,9 +40,10 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class WindowStoreBuilderTest { @@ -57,6 +58,7 @@ public class WindowStoreBuilderTest { public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); replay(supplier); builder = new WindowStoreBuilder<>( @@ -171,10 +173,8 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { expect(supplier.name()).andReturn("name"); replay(supplier); - try { - new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); - } catch (final Exception e) { - fail(); - } + final Exception e = assertThrows(NullPointerException.class, + () -> new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); } } \ No newline at end of file From 29d766afe504cade3fd8dfb8cf966e0cb83c67bf Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 20 Jun 2020 21:40:20 +0900 Subject: [PATCH 12/18] Add nullity check in 'WindowStoreBuilder' constructor. (For parity with 'TimestampedWindowStoreBuilder', 'SessionStoreBuilder', etc.) --- .../apache/kafka/streams/state/internals/WindowStoreBuilder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java index 208ddc39a7f4..5876f78c64e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -36,6 +36,7 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, final Serde valueSerde, final Time time) { super(storeSupplier.name(), keySerde, valueSerde, time); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } From ac6ac58c18513f653b8757dee4a686cb431e9779 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 20 Jun 2020 21:42:02 +0900 Subject: [PATCH 13/18] Remove unthrown Exception in SessionStoreBuilderTest#setUp --- .../kafka/streams/state/internals/SessionStoreBuilderTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java index c2379d3cd344..9de051b82b65 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -51,8 +51,7 @@ public class SessionStoreBuilderTest { private SessionStoreBuilder builder; @Before - public void setUp() throws Exception { - + public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); expect(supplier.metricsScope()).andReturn("metricScope"); From 958cb5b5819d2d65f353c445b94f58915f7e5853 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 20 Jun 2020 21:54:24 +0900 Subject: [PATCH 14/18] Fix typo in `XXXStoreBuilder` validation methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `KeyValueStoreBuilder`: `bytesStoreSupplier` → `storeSupplier` - `TimestampedKeyValueStoreBuilder`: `bytesStoreSupplier` → `storeSupplier` - `TimestampedWindowStoreBuilder`: `bytesStoreSupplier` → `storeSupplier` - `SessionStoreBuilder`: `supplier` → `storeSupplier` --- .../kafka/streams/state/internals/KeyValueStoreBuilder.java | 4 ++-- .../kafka/streams/state/internals/SessionStoreBuilder.java | 4 ++-- .../state/internals/TimestampedKeyValueStoreBuilder.java | 4 ++-- .../state/internals/TimestampedWindowStoreBuilder.java | 4 ++-- .../streams/state/internals/KeyValueStoreBuilderTest.java | 2 +- .../streams/state/internals/SessionStoreBuilderTest.java | 4 ++-- .../state/internals/TimestampedKeyValueStoreBuilderTest.java | 2 +- .../state/internals/TimestampedWindowStoreBuilderTest.java | 2 +- 8 files changed, 13 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java index cc641ced60ce..7888316bacf7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java @@ -33,8 +33,8 @@ public KeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier, final Serde valueSerde, final Time time) { super(storeSupplier.name(), keySerde, valueSerde, time); - Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); - Objects.requireNonNull(storeSupplier.metricsScope(), "bytesStoreSupplier's metricsScope can't be null"); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java index 6010433571b6..d0d039420673 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java @@ -33,8 +33,8 @@ public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde, final Time time) { - super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time); - Objects.requireNonNull(storeSupplier.metricsScope(), "supplier's metricsScope can't be null"); + super(Objects.requireNonNull(storeSupplier, "storeSupplier cannot be null").name(), keySerde, valueSerde, time); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index fbad4fb07934..cea10887e18a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -46,8 +46,8 @@ public TimestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSup keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time); - Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); - Objects.requireNonNull(storeSupplier.metricsScope(), "bytesStoreSupplier's metricsScope can't be null"); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index e3fa1b0ea494..290b0ff17160 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -46,8 +46,8 @@ public TimestampedWindowStoreBuilder(final WindowBytesStoreSupplier storeSupplie final Serde valueSerde, final Time time) { super(storeSupplier.name(), keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time); - Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); - Objects.requireNonNull(storeSupplier.metricsScope(), "bytesStoreSupplier's metricsScope can't be null"); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java index a766325a6ac2..cb6d848dd2e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java @@ -149,7 +149,7 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { final Exception e = assertThrows(NullPointerException.class, () -> new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - assertThat(e.getMessage(), equalTo("bytesStoreSupplier's metricsScope can't be null")); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java index 9de051b82b65..352b0cbe7779 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -120,7 +120,7 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { @Test public void shouldThrowNullPointerIfInnerIsNull() { final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime())); - assertThat(e.getMessage(), equalTo("supplier cannot be null")); + assertThat(e.getMessage(), equalTo("storeSupplier cannot be null")); } @Test @@ -160,7 +160,7 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - assertThat(e.getMessage(), equalTo("supplier's metricsScope can't be null")); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java index bfa6acf407c9..b79d67e03092 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java @@ -177,7 +177,7 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { final Exception e = assertThrows(NullPointerException.class, () -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - assertThat(e.getMessage(), equalTo("bytesStoreSupplier's metricsScope can't be null")); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java index 896076932817..586ec73ea66f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java @@ -218,7 +218,7 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() { replay(supplier); final Exception e = assertThrows(NullPointerException.class, () -> new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - assertThat(e.getMessage(), equalTo("bytesStoreSupplier's metricsScope can't be null")); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); } } \ No newline at end of file From dff84f2cf87a54fe7b6ce55ef0ac4371eb86af18 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 20 Jun 2020 21:55:22 +0900 Subject: [PATCH 15/18] Remove unused variables: RocksDBStore, SuppressionIntegrationTest --- .../kafka/streams/integration/SuppressionIntegrationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 9d7c23d73c63..800ea9652d53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -441,7 +441,6 @@ public void shouldAllowDisablingChangelog() { final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - final String changeLog = "suppressionintegrationtest-shouldAllowDisablingChangelog-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog"; cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); From e92dda54b2d23ef95cad3e575f4bc9a6df09641d Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 20 Jun 2020 21:57:35 +0900 Subject: [PATCH 16/18] Remove redundant type parameters from AbstractJoinIntegrationTest, SuppressScenarioTest --- .../kafka/streams/integration/AbstractJoinIntegrationTest.java | 2 +- .../kafka/streams/kstream/internals/SuppressScenarioTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index d7e19c7e9e89..43f4d6d8e02f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -190,7 +190,7 @@ void runTestWithDriver(final TestRecord expectedFinalResult, final } final TestRecord updatedExpectedFinalResult = - new TestRecord( + new TestRecord<>( expectedFinalResult.key(), expectedFinalResult.value(), null, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index c5c06c0de440..c3528272e04b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -816,7 +816,7 @@ private static void verify(final List> results, for (final TestRecord result : results) { final KeyValueTimestamp expected = expectedIterator.next(); try { - assertThat(result, equalTo(new TestRecord(expected.key(), expected.value(), null, expected.timestamp()))); + assertThat(result, equalTo(new TestRecord<>(expected.key(), expected.value(), null, expected.timestamp()))); } catch (final AssertionError e) { throw new AssertionError(printRecords(results) + " != " + expectedResults, e); } From 6972309253301b252acdf85d48d0dba758cff5b5 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Thu, 8 Oct 2020 00:50:34 +0900 Subject: [PATCH 17/18] Revert WindowedSerdes#timeWindowedSerdeFrom(Class, long) --- .../org/apache/kafka/streams/kstream/WindowedSerdes.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java index 06ce7b719f94..07beb5a2a2f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java @@ -64,6 +64,14 @@ static public Serde> timeWindowedSerdeFrom(final Class type) return new TimeWindowedSerde<>(Serdes.serdeFrom(type)); } + /** + * Construct a {@code TimeWindowedSerde} object to deserialize changelog topic + * for the specified inner class type and window size. + */ + static public Serde> timeWindowedSerdeFrom(final Class type, final long windowSize) { + return new TimeWindowedSerde<>(Serdes.serdeFrom(type), windowSize); + } + /** * Construct a {@code SessionWindowedSerde} object for the specified inner class type. */ From 9f7a73ff0e1027602e7590cea21e2c5973a76973 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Thu, 8 Oct 2020 01:01:52 +0900 Subject: [PATCH 18/18] Add supplier mock object in TimestampedKeyValueStoreMaterializerTest#shouldCreateKeyValueStoreWithTheProvidedInnerStore to return metricScope: without it, NullPointerExcepion is thrown during TimestampedKeyValueStore instance creation. --- .../internals/TimestampedKeyValueStoreMaterializerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java index f963786a3c3c..77f4ab8f0df4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java @@ -111,6 +111,7 @@ public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() { final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name"); EasyMock.expect(supplier.name()).andReturn("name").anyTimes(); EasyMock.expect(supplier.get()).andReturn(store); + EasyMock.expect(supplier.metricsScope()).andReturn("metricScope"); EasyMock.replay(supplier); final MaterializedInternal> materialized =