diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 5de51279f193b..f1c64c2d4fb86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -44,12 +44,11 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.test.KeyValueIteratorStub; -import org.easymock.EasyMockRule; -import org.easymock.Mock; -import org.easymock.MockType; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import java.util.Collections; import java.util.List; @@ -58,15 +57,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.aryEq; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.niceMock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; @@ -75,12 +65,16 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; - +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class MeteredSessionStoreTest { - @Rule - public EasyMockRule rule = new EasyMockRule(this); - private static final String APPLICATION_ID = "test-app"; private static final String STORE_TYPE = "scope"; private static final String STORE_NAME = "mocked-store"; @@ -101,9 +95,9 @@ public class MeteredSessionStoreTest { private final TaskId taskId = new TaskId(0, 0, "My-Topology"); private final Metrics metrics = new Metrics(); private MeteredSessionStore store; - @Mock(type = MockType.NICE) + @Mock private SessionStore innerStore; - @Mock(type = MockType.NICE) + @Mock private InternalProcessorContext context; private Map tags; @@ -119,12 +113,12 @@ public void before() { mockTime ); metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); - expect(context.applicationId()).andStubReturn(APPLICATION_ID); - expect(context.metrics()) - .andStubReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime)); - expect(context.taskId()).andStubReturn(taskId); - expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC); - expect(innerStore.name()).andStubReturn(STORE_NAME); + when(context.applicationId()).thenReturn(APPLICATION_ID); + when(context.metrics()) + .thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime)); + when(context.taskId()).thenReturn(taskId); + when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC); + when(innerStore.name()).thenReturn(STORE_NAME); tags = mkMap( mkEntry(THREAD_ID_TAG_KEY, threadId), mkEntry("task-id", taskId.toString()), @@ -133,45 +127,34 @@ public void before() { } private void init() { - replay(innerStore, context); store.init((StateStoreContext) context, store); } @SuppressWarnings("deprecation") @Test public void shouldDelegateDeprecatedInit() { - final SessionStore inner = mock(SessionStore.class); final MeteredSessionStore outer = new MeteredSessionStore<>( - inner, + innerStore, STORE_TYPE, Serdes.String(), Serdes.String(), new MockTime() ); - expect(inner.name()).andStubReturn("store"); - inner.init((ProcessorContext) context, outer); - expectLastCall(); - replay(inner, context); + doNothing().when(innerStore).init((ProcessorContext) context, outer); outer.init((ProcessorContext) context, outer); - verify(inner); } @Test public void shouldDelegateInit() { - final SessionStore inner = mock(SessionStore.class); final MeteredSessionStore outer = new MeteredSessionStore<>( - inner, + innerStore, STORE_TYPE, Serdes.String(), Serdes.String(), new MockTime() ); - expect(inner.name()).andStubReturn("store"); - inner.init((StateStoreContext) context, outer); - expectLastCall(); - replay(inner, context); + doNothing().when(innerStore).init((StateStoreContext) context, outer); outer.init((StateStoreContext) context, outer); - verify(inner); } @Test @@ -183,24 +166,24 @@ public void shouldPassChangelogTopicNameToStateStoreSerde() { public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() { final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, taskId.topologyName()); - expect(context.changelogFor(STORE_NAME)).andReturn(null); + when(context.changelogFor(STORE_NAME)).thenReturn(null); doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); } + @SuppressWarnings("unchecked") private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { - final Serde keySerde = niceMock(Serde.class); + final Serde keySerde = mock(Serde.class); final Serializer keySerializer = mock(Serializer.class); - final Serde valueSerde = niceMock(Serde.class); + final Serde valueSerde = mock(Serde.class); final Deserializer valueDeserializer = mock(Deserializer.class); final Serializer valueSerializer = mock(Serializer.class); - expect(keySerde.serializer()).andStubReturn(keySerializer); - expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes()); - expect(valueSerde.deserializer()).andStubReturn(valueDeserializer); - expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE); - expect(valueSerde.serializer()).andStubReturn(valueSerializer); - expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES); - expect(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, END_TIMESTAMP)).andStubReturn(VALUE_BYTES); - replay(innerStore, context, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde); + when(keySerde.serializer()).thenReturn(keySerializer); + when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes()); + when(valueSerde.deserializer()).thenReturn(valueDeserializer); + when(valueDeserializer.deserialize(topic, VALUE_BYTES)).thenReturn(VALUE); + when(valueSerde.serializer()).thenReturn(valueSerializer); + when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES); + when(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, END_TIMESTAMP)).thenReturn(VALUE_BYTES); store = new MeteredSessionStore<>( innerStore, STORE_TYPE, @@ -212,8 +195,6 @@ private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP); store.put(WINDOWED_KEY, VALUE); - - verify(keySerializer, valueDeserializer, valueSerializer); } @Test @@ -237,8 +218,7 @@ public void testMetrics() { @Test public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { - innerStore.put(eq(WINDOWED_KEY_BYTES), aryEq(VALUE_BYTES)); - expectLastCall(); + doNothing().when(innerStore).put(WINDOWED_KEY_BYTES, VALUE_BYTES); init(); store.put(WINDOWED_KEY, VALUE); @@ -247,13 +227,12 @@ public void shouldWriteBytesToInnerStoreAndRecordPutMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("put-rate"); assertTrue(((Double) metric.metricValue()) > 0); - verify(innerStore); } @Test public void shouldFindSessionsFromStoreAndRecordFetchMetric() { - expect(innerStore.findSessions(KEY_BYTES, 0, 0)) - .andReturn(new KeyValueIteratorStub<>( + when(innerStore.findSessions(KEY_BYTES, 0, 0)) + .thenReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); init(); @@ -266,13 +245,12 @@ public void shouldFindSessionsFromStoreAndRecordFetchMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldBackwardFindSessionsFromStoreAndRecordFetchMetric() { - expect(innerStore.backwardFindSessions(KEY_BYTES, 0, 0)) - .andReturn( + when(innerStore.backwardFindSessions(KEY_BYTES, 0, 0)) + .thenReturn( new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() ) @@ -288,13 +266,12 @@ public void shouldBackwardFindSessionsFromStoreAndRecordFetchMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { - expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0)) - .andReturn(new KeyValueIteratorStub<>( + when(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0)) + .thenReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); init(); @@ -307,13 +284,12 @@ public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldBackwardFindSessionRangeFromStoreAndRecordFetchMetric() { - expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0)) - .andReturn( + when(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0)) + .thenReturn( new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() ) @@ -329,13 +305,11 @@ public void shouldBackwardFindSessionRangeFromStoreAndRecordFetchMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldRemoveFromStoreAndRecordRemoveMetric() { - innerStore.remove(WINDOWED_KEY_BYTES); - expectLastCall(); + doNothing().when(innerStore).remove(WINDOWED_KEY_BYTES); init(); @@ -345,13 +319,12 @@ public void shouldRemoveFromStoreAndRecordRemoveMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("remove-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldFetchForKeyAndRecordFetchMetric() { - expect(innerStore.fetch(KEY_BYTES)) - .andReturn(new KeyValueIteratorStub<>( + when(innerStore.fetch(KEY_BYTES)) + .thenReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); init(); @@ -364,13 +337,12 @@ public void shouldFetchForKeyAndRecordFetchMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldBackwardFetchForKeyAndRecordFetchMetric() { - expect(innerStore.backwardFetch(KEY_BYTES)) - .andReturn( + when(innerStore.backwardFetch(KEY_BYTES)) + .thenReturn( new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() ) @@ -386,13 +358,12 @@ public void shouldBackwardFetchForKeyAndRecordFetchMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldFetchRangeFromStoreAndRecordFetchMetric() { - expect(innerStore.fetch(KEY_BYTES, KEY_BYTES)) - .andReturn(new KeyValueIteratorStub<>( + when(innerStore.fetch(KEY_BYTES, KEY_BYTES)) + .thenReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator())); init(); @@ -405,13 +376,12 @@ public void shouldFetchRangeFromStoreAndRecordFetchMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldBackwardFetchRangeFromStoreAndRecordFetchMetric() { - expect(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES)) - .andReturn( + when(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES)) + .thenReturn( new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, VALUE_BYTES)).iterator() ) @@ -427,14 +397,13 @@ public void shouldBackwardFetchRangeFromStoreAndRecordFetchMetric() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStore); } @Test public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() { final long systemTime = Time.SYSTEM.milliseconds(); - expect(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) - .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); + when(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) + .thenReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); init(); final KeyValueIterator, String> iterator = store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime); @@ -445,8 +414,8 @@ public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() { @Test public void shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() { final long systemTime = Time.SYSTEM.milliseconds(); - expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) - .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); + when(innerStore.backwardFindSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) + .thenReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); init(); final KeyValueIterator, String> iterator = store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime); @@ -457,8 +426,8 @@ public void shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() { @Test public void shouldNotFindExpiredSessionRangeFromStore() { final long systemTime = Time.SYSTEM.milliseconds(); - expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) - .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); + when(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) + .thenReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); init(); final KeyValueIterator, String> iterator = store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime); @@ -469,8 +438,8 @@ public void shouldNotFindExpiredSessionRangeFromStore() { @Test public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() { final long systemTime = Time.SYSTEM.milliseconds(); - expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) - .andReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); + when(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime - RETENTION_PERIOD, systemTime)) + .thenReturn(new KeyValueIteratorStub<>(KeyValueIterators.emptyIterator())); init(); final KeyValueIterator, String> iterator = store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime); @@ -490,7 +459,7 @@ public void shouldRecordRestoreTimeOnInit() { @Test public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() { - expect(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null); + when(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).thenReturn(null); init(); assertNull(store.fetchSession("a", 0, Long.MAX_VALUE)); @@ -598,8 +567,7 @@ private interface CachedSessionStore extends SessionStore, Cached public void shouldSetFlushListenerOnWrappedCachingStore() { final CachedSessionStore cachedSessionStore = mock(CachedSessionStore.class); - expect(cachedSessionStore.setFlushListener(anyObject(CacheFlushListener.class), eq(false))).andReturn(true); - replay(cachedSessionStore); + when(cachedSessionStore.setFlushListener(any(CacheFlushListener.class), eq(false))).thenReturn(true); store = new MeteredSessionStore<>( cachedSessionStore, @@ -608,8 +576,6 @@ public void shouldSetFlushListenerOnWrappedCachingStore() { Serdes.String(), new MockTime()); assertTrue(store.setFlushListener(null, false)); - - verify(cachedSessionStore); } @Test @@ -619,27 +585,23 @@ public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { @Test public void shouldRemoveMetricsOnClose() { - innerStore.close(); - expectLastCall(); + doNothing().when(innerStore).close(); init(); // replays "inner" // There's always a "count" metric registered assertThat(storeMetrics(), not(empty())); store.close(); assertThat(storeMetrics(), empty()); - verify(innerStore); } @Test public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() { - innerStore.close(); - expectLastCall().andThrow(new RuntimeException("Oops!")); + doThrow(new RuntimeException("Oops!")).when(innerStore).close(); init(); // replays "inner" assertThat(storeMetrics(), not(empty())); assertThrows(RuntimeException.class, store::close); assertThat(storeMetrics(), empty()); - verify(innerStore); } private KafkaMetric metric(final String name) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 20eb5ec88a192..952a52dbff277 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -44,6 +44,8 @@ import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; import java.time.temporal.ChronoUnit; import java.util.List; @@ -53,15 +55,6 @@ import static java.time.Instant.ofEpochMilli; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.niceMock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThan; @@ -70,7 +63,14 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; - +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class MeteredWindowStoreTest { private static final String STORE_TYPE = "scope"; @@ -88,7 +88,8 @@ public class MeteredWindowStoreTest { private final String threadId = Thread.currentThread().getName(); private InternalMockProcessorContext context; - private final WindowStore innerStoreMock = createNiceMock(WindowStore.class); + @SuppressWarnings("unchecked") + private final WindowStore innerStoreMock = mock(WindowStore.class); private MeteredWindowStore store = new MeteredWindowStore<>( innerStoreMock, WINDOW_SIZE_MS, // any size @@ -101,7 +102,7 @@ public class MeteredWindowStoreTest { private Map tags; { - expect(innerStoreMock.name()).andReturn(STORE_NAME).anyTimes(); + when(innerStoreMock.name()).thenReturn(STORE_NAME); } @Before @@ -128,40 +129,32 @@ public void setUp() { @SuppressWarnings("deprecation") @Test public void shouldDelegateDeprecatedInit() { - final WindowStore inner = mock(WindowStore.class); final MeteredWindowStore outer = new MeteredWindowStore<>( - inner, + innerStoreMock, WINDOW_SIZE_MS, // any size STORE_TYPE, new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull() ); - expect(inner.name()).andStubReturn("store"); - inner.init((ProcessorContext) context, outer); - expectLastCall(); - replay(inner); + when(innerStoreMock.name()).thenReturn("store"); + doNothing().when(innerStoreMock).init((ProcessorContext) context, outer); outer.init((ProcessorContext) context, outer); - verify(inner); } @Test public void shouldDelegateInit() { - final WindowStore inner = mock(WindowStore.class); final MeteredWindowStore outer = new MeteredWindowStore<>( - inner, + innerStoreMock, WINDOW_SIZE_MS, // any size STORE_TYPE, new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull() ); - expect(inner.name()).andStubReturn("store"); - inner.init((StateStoreContext) context, outer); - expectLastCall(); - replay(inner); + when(innerStoreMock.name()).thenReturn("store"); + doNothing().when(innerStoreMock).init((StateStoreContext) context, outer); outer.init((StateStoreContext) context, outer); - verify(inner); } @Test @@ -177,20 +170,20 @@ public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisable doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName); } + @SuppressWarnings("unchecked") private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) { - final Serde keySerde = niceMock(Serde.class); + final Serde keySerde = mock(Serde.class); final Serializer keySerializer = mock(Serializer.class); - final Serde valueSerde = niceMock(Serde.class); + final Serde valueSerde = mock(Serde.class); final Deserializer valueDeserializer = mock(Deserializer.class); final Serializer valueSerializer = mock(Serializer.class); - expect(keySerde.serializer()).andStubReturn(keySerializer); - expect(keySerializer.serialize(topic, KEY)).andStubReturn(KEY.getBytes()); - expect(valueSerde.deserializer()).andStubReturn(valueDeserializer); - expect(valueDeserializer.deserialize(topic, VALUE_BYTES)).andStubReturn(VALUE); - expect(valueSerde.serializer()).andStubReturn(valueSerializer); - expect(valueSerializer.serialize(topic, VALUE)).andStubReturn(VALUE_BYTES); - expect(innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).andStubReturn(VALUE_BYTES); - replay(innerStoreMock, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde); + when(keySerde.serializer()).thenReturn(keySerializer); + when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes()); + when(valueSerde.deserializer()).thenReturn(valueDeserializer); + when(valueDeserializer.deserialize(topic, VALUE_BYTES)).thenReturn(VALUE); + when(valueSerde.serializer()).thenReturn(valueSerializer); + when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES); + when(innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).thenReturn(VALUE_BYTES); store = new MeteredWindowStore<>( innerStoreMock, WINDOW_SIZE_MS, @@ -203,13 +196,10 @@ private void doShouldPassChangelogTopicNameToStateStoreSerde(final String topic) store.fetch(KEY, TIMESTAMP); store.put(KEY, VALUE, TIMESTAMP); - - verify(keySerializer, valueDeserializer, valueSerializer); } @Test public void testMetrics() { - replay(innerStoreMock); store.init((StateStoreContext) context, store); final JmxReporter reporter = new JmxReporter(); final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams"); @@ -229,22 +219,19 @@ public void testMetrics() { @Test public void shouldRecordRestoreLatencyOnInit() { - innerStoreMock.init((StateStoreContext) context, store); - replay(innerStoreMock); + doNothing().when(innerStoreMock).init((StateStoreContext) context, store); store.init((StateStoreContext) context, store); // it suffices to verify one restore metric since all restore metrics are recorded by the same sensor // and the sensor is tested elsewhere final KafkaMetric metric = metric("restore-rate"); assertThat((Double) metric.metricValue(), greaterThan(0.0)); - verify(innerStoreMock); } @Test public void shouldPutToInnerStoreAndRecordPutMetrics() { final byte[] bytes = "a".getBytes(); - innerStoreMock.put(eq(Bytes.wrap(bytes)), anyObject(), eq(context.timestamp())); - replay(innerStoreMock); + doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.timestamp())); store.init((StateStoreContext) context, store); store.put("a", "a", context.timestamp()); @@ -253,14 +240,12 @@ public void shouldPutToInnerStoreAndRecordPutMetrics() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("put-rate"); assertThat((Double) metric.metricValue(), greaterThan(0.0)); - verify(innerStoreMock); } @Test public void shouldFetchFromInnerStoreAndRecordFetchMetrics() { - expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)) - .andReturn(KeyValueIterators.emptyWindowStoreIterator()); - replay(innerStoreMock); + when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)) + .thenReturn(KeyValueIterators.emptyWindowStoreIterator()); store.init((StateStoreContext) context, store); store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; @@ -269,32 +254,27 @@ public void shouldFetchFromInnerStoreAndRecordFetchMetrics() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertThat((Double) metric.metricValue(), greaterThan(0.0)); - verify(innerStoreMock); } @Test public void shouldReturnNoRecordWhenFetchedKeyHasExpired() { - expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + RETENTION_PERIOD)) - .andReturn(KeyValueIterators.emptyWindowStoreIterator()); - replay(innerStoreMock); + when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + RETENTION_PERIOD)) + .thenReturn(KeyValueIterators.emptyWindowStoreIterator()); store.init((StateStoreContext) context, store); store.fetch("a", ofEpochMilli(1), ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded on close; - - verify(innerStoreMock); } @Test public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() { - expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - expect(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - expect(innerStoreMock.fetch(null, null, 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - replay(innerStoreMock); + when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); + when(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); + when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); + when(innerStoreMock.fetch(null, null, 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); store.init((StateStoreContext) context, store); store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; @@ -306,14 +286,12 @@ public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertThat((Double) metric.metricValue(), greaterThan(0.0)); - verify(innerStoreMock); } @Test public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() { - expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - replay(innerStoreMock); + when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); store.init((StateStoreContext) context, store); store.backwardFetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; @@ -322,20 +300,18 @@ public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertThat((Double) metric.metricValue(), greaterThan(0.0)); - verify(innerStoreMock); } @Test public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() { - expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - expect(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - expect(innerStoreMock.backwardFetch(null, null, 1, 1)) - .andReturn(KeyValueIterators.emptyIterator()); - replay(innerStoreMock); + when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); + when(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); + when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); + when(innerStoreMock.backwardFetch(null, null, 1, 1)) + .thenReturn(KeyValueIterators.emptyIterator()); store.init((StateStoreContext) context, store); store.backwardFetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; @@ -347,13 +323,11 @@ public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertThat((Double) metric.metricValue(), greaterThan(0.0)); - verify(innerStoreMock); } @Test public void shouldFetchAllFromInnerStoreAndRecordFetchMetrics() { - expect(innerStoreMock.fetchAll(1, 1)).andReturn(KeyValueIterators.emptyIterator()); - replay(innerStoreMock); + when(innerStoreMock.fetchAll(1, 1)).thenReturn(KeyValueIterators.emptyIterator()); store.init((StateStoreContext) context, store); store.fetchAll(ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; @@ -362,13 +336,11 @@ public void shouldFetchAllFromInnerStoreAndRecordFetchMetrics() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertThat((Double) metric.metricValue(), greaterThan(0.0)); - verify(innerStoreMock); } @Test public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() { - expect(innerStoreMock.backwardFetchAll(1, 1)).andReturn(KeyValueIterators.emptyIterator()); - replay(innerStoreMock); + when(innerStoreMock.backwardFetchAll(1, 1)).thenReturn(KeyValueIterators.emptyIterator()); store.init((StateStoreContext) context, store); store.backwardFetchAll(ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; @@ -377,13 +349,11 @@ public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("fetch-rate"); assertThat((Double) metric.metricValue(), greaterThan(0.0)); - verify(innerStoreMock); } @Test public void shouldRecordFlushLatency() { - innerStoreMock.flush(); - replay(innerStoreMock); + doNothing().when(innerStoreMock).flush(); store.init((StateStoreContext) context, store); store.flush(); @@ -392,13 +362,11 @@ public void shouldRecordFlushLatency() { // and the sensor is tested elsewhere final KafkaMetric metric = metric("flush-rate"); assertTrue((Double) metric.metricValue() > 0); - verify(innerStoreMock); } @Test public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() { - expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null); - replay(innerStoreMock); + when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).thenReturn(null); store.init((StateStoreContext) context, store); assertNull(store.fetch("a", 0)); @@ -412,8 +380,7 @@ private interface CachedWindowStore extends WindowStore, CachedSt public void shouldSetFlushListenerOnWrappedCachingStore() { final CachedWindowStore cachedWindowStore = mock(CachedWindowStore.class); - expect(cachedWindowStore.setFlushListener(anyObject(CacheFlushListener.class), eq(false))).andReturn(true); - replay(cachedWindowStore); + when(cachedWindowStore.setFlushListener(any(CacheFlushListener.class), eq(false))).thenReturn(true); final MeteredWindowStore metered = new MeteredWindowStore<>( cachedWindowStore, @@ -424,8 +391,6 @@ public void shouldSetFlushListenerOnWrappedCachingStore() { new SerdeThatDoesntHandleNull() ); assertTrue(metered.setFlushListener(null, false)); - - verify(cachedWindowStore); } @Test @@ -435,40 +400,31 @@ public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { @Test public void shouldCloseUnderlyingStore() { - innerStoreMock.close(); - expectLastCall(); - replay(innerStoreMock); + doNothing().when(innerStoreMock).close(); store.init((StateStoreContext) context, store); store.close(); - verify(innerStoreMock); } @Test public void shouldRemoveMetricsOnClose() { - innerStoreMock.close(); - expectLastCall(); - replay(innerStoreMock); + doNothing().when(innerStoreMock).close(); store.init((StateStoreContext) context, store); assertThat(storeMetrics(), not(empty())); store.close(); assertThat(storeMetrics(), empty()); - verify(innerStoreMock); } @Test public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() { - innerStoreMock.close(); - expectLastCall().andThrow(new RuntimeException("Oops!")); - replay(innerStoreMock); + doThrow(new RuntimeException("Oops!")).when(innerStoreMock).close(); store.init((StateStoreContext) context, store); // There's always a "count" metric registered assertThat(storeMetrics(), not(empty())); assertThrows(RuntimeException.class, store::close); assertThat(storeMetrics(), empty()); - verify(innerStoreMock); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java index ffb5ab3c53da5..d2b69e974154c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java @@ -21,20 +21,18 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; -import org.easymock.EasyMockRunner; -import org.easymock.Mock; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; -@RunWith(EasyMockRunner.class) +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class ReadOnlyKeyValueStoreFacadeTest { @Mock private TimestampedKeyValueStore mockedKeyValueTimestampStore; @@ -50,66 +48,56 @@ public void setup() { @Test public void shouldReturnPlainValueOnGet() { - expect(mockedKeyValueTimestampStore.get("key")) - .andReturn(ValueAndTimestamp.make("value", 42L)); - expect(mockedKeyValueTimestampStore.get("unknownKey")) - .andReturn(null); - replay(mockedKeyValueTimestampStore); + when(mockedKeyValueTimestampStore.get("key")) + .thenReturn(ValueAndTimestamp.make("value", 42L)); + when(mockedKeyValueTimestampStore.get("unknownKey")) + .thenReturn(null); assertThat(readOnlyKeyValueStoreFacade.get("key"), is("value")); assertNull(readOnlyKeyValueStoreFacade.get("unknownKey")); - verify(mockedKeyValueTimestampStore); } @Test public void shouldReturnPlainKeyValuePairsForRangeIterator() { - expect(mockedKeyValueTimestampIterator.next()) - .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L))) - .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L))); - expect(mockedKeyValueTimestampStore.range("key1", "key2")).andReturn(mockedKeyValueTimestampIterator); - replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore); + when(mockedKeyValueTimestampIterator.next()) + .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L))) + .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L))); + when(mockedKeyValueTimestampStore.range("key1", "key2")).thenReturn(mockedKeyValueTimestampIterator); final KeyValueIterator iterator = readOnlyKeyValueStoreFacade.range("key1", "key2"); assertThat(iterator.next(), is(KeyValue.pair("key1", "value1"))); assertThat(iterator.next(), is(KeyValue.pair("key2", "value2"))); - verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore); } @Test public void shouldReturnPlainKeyValuePairsForPrefixScan() { final StringSerializer stringSerializer = new StringSerializer(); - expect(mockedKeyValueTimestampIterator.next()) - .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L))) - .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L))); - expect(mockedKeyValueTimestampStore.prefixScan("key", stringSerializer)).andReturn(mockedKeyValueTimestampIterator); - replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore); + when(mockedKeyValueTimestampIterator.next()) + .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L))) + .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L))); + when(mockedKeyValueTimestampStore.prefixScan("key", stringSerializer)).thenReturn(mockedKeyValueTimestampIterator); final KeyValueIterator iterator = readOnlyKeyValueStoreFacade.prefixScan("key", stringSerializer); assertThat(iterator.next(), is(KeyValue.pair("key1", "value1"))); assertThat(iterator.next(), is(KeyValue.pair("key2", "value2"))); - verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore); } @Test public void shouldReturnPlainKeyValuePairsForAllIterator() { - expect(mockedKeyValueTimestampIterator.next()) - .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L))) - .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L))); - expect(mockedKeyValueTimestampStore.all()).andReturn(mockedKeyValueTimestampIterator); - replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore); + when(mockedKeyValueTimestampIterator.next()) + .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L))) + .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L))); + when(mockedKeyValueTimestampStore.all()).thenReturn(mockedKeyValueTimestampIterator); final KeyValueIterator iterator = readOnlyKeyValueStoreFacade.all(); assertThat(iterator.next(), is(KeyValue.pair("key1", "value1"))); assertThat(iterator.next(), is(KeyValue.pair("key2", "value2"))); - verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore); } @Test public void shouldForwardApproximateNumEntries() { - expect(mockedKeyValueTimestampStore.approximateNumEntries()).andReturn(42L); - replay(mockedKeyValueTimestampStore); + when(mockedKeyValueTimestampStore.approximateNumEntries()).thenReturn(42L); assertThat(readOnlyKeyValueStoreFacade.approximateNumEntries(), is(42L)); - verify(mockedKeyValueTimestampStore); } }