diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 176966f827d3c..654f2e6b10ee1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -30,14 +30,11 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ImmutableMetricValue; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; import org.apache.kafka.test.StreamsTestUtils; -import org.easymock.Capture; -import org.easymock.CaptureType; -import org.easymock.EasyMock; -import org.easymock.IArgumentMatcher; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.junit.MockitoJUnitRunner; import java.time.Duration; import java.util.Arrays; @@ -63,17 +60,6 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.newCapture; -import static org.easymock.EasyMock.niceMock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.resetToDefault; -import static org.easymock.EasyMock.verify; -import static org.easymock.EasyMock.eq; import static org.hamcrest.CoreMatchers.equalToObject; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -87,10 +73,15 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; -import static org.powermock.api.easymock.PowerMock.createMock; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({Sensor.class, KafkaMetric.class}) +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class StreamsMetricsImplTest { private final static String SENSOR_PREFIX_DELIMITER = "."; @@ -147,68 +138,59 @@ public class StreamsMetricsImplTest { private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); private static MetricConfig eqMetricConfig(final MetricConfig metricConfig) { - EasyMock.reportMatcher(new IArgumentMatcher() { - private final StringBuffer message = new StringBuffer(); - - @Override - public boolean matches(final Object argument) { - if (argument instanceof MetricConfig) { - final MetricConfig otherMetricConfig = (MetricConfig) argument; - final boolean equalsComparisons = + final StringBuffer message = new StringBuffer(); + + argThat((ArgumentMatcher) argument -> { + if (argument instanceof MetricConfig) { + final MetricConfig otherMetricConfig = argument; + final boolean equalsComparisons = (otherMetricConfig.quota() == metricConfig.quota() || - otherMetricConfig.quota().equals(metricConfig.quota())) && - otherMetricConfig.tags().equals(metricConfig.tags()); - if (otherMetricConfig.eventWindow() == metricConfig.eventWindow() && + otherMetricConfig.quota().equals(metricConfig.quota())) && + otherMetricConfig.tags().equals(metricConfig.tags()); + if (otherMetricConfig.eventWindow() == metricConfig.eventWindow() && otherMetricConfig.recordLevel() == metricConfig.recordLevel() && equalsComparisons && otherMetricConfig.samples() == metricConfig.samples() && otherMetricConfig.timeWindowMs() == metricConfig.timeWindowMs()) { - return true; - } else { - message.append("{ "); - message.append("eventWindow="); - message.append(otherMetricConfig.eventWindow()); - message.append(", "); - message.append("recordLevel="); - message.append(otherMetricConfig.recordLevel()); - message.append(", "); - message.append("quota="); - message.append(otherMetricConfig.quota().toString()); - message.append(", "); - message.append("samples="); - message.append(otherMetricConfig.samples()); - message.append(", "); - message.append("tags="); - message.append(otherMetricConfig.tags().toString()); - message.append(", "); - message.append("timeWindowMs="); - message.append(otherMetricConfig.timeWindowMs()); - message.append(" }"); - } + return true; + } else { + message.append("{ "); + message.append("eventWindow="); + message.append(otherMetricConfig.eventWindow()); + message.append(", "); + message.append("recordLevel="); + message.append(otherMetricConfig.recordLevel()); + message.append(", "); + message.append("quota="); + message.append(otherMetricConfig.quota().toString()); + message.append(", "); + message.append("samples="); + message.append(otherMetricConfig.samples()); + message.append(", "); + message.append("tags="); + message.append(otherMetricConfig.tags().toString()); + message.append(", "); + message.append("timeWindowMs="); + message.append(otherMetricConfig.timeWindowMs()); + message.append(" }"); } - message.append("not a MetricConfig object"); - return false; - } - - @Override - public void appendTo(final StringBuffer buffer) { - buffer.append(message); } + message.append("not a MetricConfig object"); + return false; }); return null; } - private Capture addSensorsOnAllLevels(final Metrics metrics, final StreamsMetricsImpl streamsMetrics) { - final Capture sensorKeys = newCapture(CaptureType.ALL); + private ArgumentCaptor addSensorsOnAllLevels(final Metrics metrics, final StreamsMetricsImpl streamsMetrics) { + final ArgumentCaptor sensorKeys = ArgumentCaptor.forClass(String.class); final Sensor[] parents = {}; - expect(metrics.sensor(capture(sensorKeys), eq(INFO_RECORDING_LEVEL), parents)) - .andStubReturn(sensor); - expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) - .andReturn(metricName1); - expect(metrics.metricName(METRIC_NAME2, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags)) - .andReturn(metricName2); - replay(metrics); + when(metrics.sensor(sensorKeys.capture(), eq(INFO_RECORDING_LEVEL), parents)) + .thenReturn(sensor); + when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) + .thenReturn(metricName1); + when(metrics.metricName(METRIC_NAME2, CLIENT_LEVEL_GROUP, DESCRIPTION2, clientLevelTags)) + .thenReturn(metricName2); streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, INFO_RECORDING_LEVEL, "value"); streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME2, DESCRIPTION2, INFO_RECORDING_LEVEL, "value"); streamsMetrics.clientLevelSensor(SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -250,19 +232,17 @@ private Capture addSensorsOnAllLevels(final Metrics metrics, final Strea return sensorKeys; } - private Capture setupGetNewSensorTest(final Metrics metrics, + private ArgumentCaptor setupGetNewSensorTest(final Metrics metrics, final RecordingLevel recordingLevel) { - final Capture sensorKey = newCapture(CaptureType.ALL); - expect(metrics.getSensor(capture(sensorKey))).andStubReturn(null); + final ArgumentCaptor sensorKey = ArgumentCaptor.forClass(String.class); + when(metrics.getSensor(sensorKey.capture())).thenReturn(null); final Sensor[] parents = {}; - expect(metrics.sensor(capture(sensorKey), eq(recordingLevel), parents)).andReturn(sensor); - replay(metrics); + when(metrics.sensor(sensorKey.capture(), eq(INFO_RECORDING_LEVEL), parents)).thenReturn(sensor); return sensorKey; } private void setupGetExistingSensorTest(final Metrics metrics) { - expect(metrics.getSensor(anyString())).andStubReturn(sensor); - replay(metrics); + when(metrics.getSensor(anyString())).thenReturn(sensor); } @Test @@ -274,7 +254,6 @@ public void shouldGetNewThreadLevelSensor() { final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -287,7 +266,6 @@ public void shouldGetExistingThreadLevelSensor() { final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -305,7 +283,6 @@ public void shouldGetNewTaskLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -323,7 +300,6 @@ public void shouldGetExistingTaskLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -343,7 +319,6 @@ public void shouldGetNewTopicLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -363,7 +338,6 @@ public void shouldGetExistingTopicLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -371,7 +345,7 @@ public void shouldGetExistingTopicLevelSensor() { public void shouldGetNewStoreLevelSensorIfNoneExists() { final Metrics metrics = mock(Metrics.class); final RecordingLevel recordingLevel = RecordingLevel.INFO; - final Capture sensorKeys = setupGetNewSensorTest(metrics, recordingLevel); + final ArgumentCaptor sensorKeys = setupGetNewSensorTest(metrics, recordingLevel); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); final Sensor actualSensor = streamsMetrics.storeLevelSensor( @@ -381,9 +355,8 @@ public void shouldGetNewStoreLevelSensorIfNoneExists() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); - assertThat(sensorKeys.getValues().get(0), is(sensorKeys.getValues().get(1))); + assertThat(sensorKeys.getAllValues().get(0), is(sensorKeys.getAllValues().get(1))); } @Test @@ -400,50 +373,49 @@ public void shouldGetExistingStoreLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @Test public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() { - final Metrics metrics = niceMock(Metrics.class); - final Capture sensorKeys = setUpSensorKeyTests(metrics); + final Metrics metrics = mock(Metrics.class); + final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL); - assertThat(sensorKeys.getValues().get(0), not(sensorKeys.getValues().get(1))); + assertThat(sensorKeys.getAllValues().get(0), not(sensorKeys.getAllValues().get(1))); } @Test public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() { - final Metrics metrics = niceMock(Metrics.class); - final Capture sensorKeys = setUpSensorKeyTests(metrics); + final Metrics metrics = mock(Metrics.class); + final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); - assertThat(sensorKeys.getValues().get(0), not(sensorKeys.getValues().get(1))); + assertThat(sensorKeys.getAllValues().get(0), not(sensorKeys.getAllValues().get(1))); } @Test public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() { - final Metrics metrics = niceMock(Metrics.class); - final Capture sensorKeys = setUpSensorKeyTests(metrics); + final Metrics metrics = mock(Metrics.class); + final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL); - assertThat(sensorKeys.getValues().get(0), not(sensorKeys.getValues().get(1))); + assertThat(sensorKeys.getAllValues().get(0), not(sensorKeys.getAllValues().get(1))); } @Test public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException { - final Metrics metrics = niceMock(Metrics.class); - final Capture sensorKeys = setUpSensorKeyTests(metrics); + final Metrics metrics = mock(Metrics.class); + final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); @@ -452,25 +424,24 @@ public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws I otherThread.start(); otherThread.join(); - assertThat(sensorKeys.getValues().get(0), not(sensorKeys.getValues().get(1))); + assertThat(sensorKeys.getAllValues().get(0), not(sensorKeys.getAllValues().get(1))); } @Test public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() { - final Metrics metrics = niceMock(Metrics.class); - final Capture sensorKeys = setUpSensorKeyTests(metrics); + final Metrics metrics = mock(Metrics.class); + final ArgumentCaptor sensorKeys = setUpSensorKeyTests(metrics); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); streamsMetrics.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL); - assertThat(sensorKeys.getValues().get(0), is(sensorKeys.getValues().get(1))); + assertThat(sensorKeys.getAllValues().get(0), is(sensorKeys.getAllValues().get(1))); } - private Capture setUpSensorKeyTests(final Metrics metrics) { - final Capture sensorKeys = newCapture(CaptureType.ALL); - expect(metrics.getSensor(capture(sensorKeys))).andStubReturn(sensor); - replay(metrics); + private ArgumentCaptor setUpSensorKeyTests(final Metrics metrics) { + final ArgumentCaptor sensorKeys = ArgumentCaptor.forClass(String.class); + when(metrics.getSensor(sensorKeys.capture())).thenReturn(sensor); return sensorKeys; } @@ -480,11 +451,10 @@ public void shouldAddNewStoreLevelMutableMetric() { final MetricName metricName = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL); - expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) - .andReturn(metricName); - expect(metrics.metric(metricName)).andReturn(null); - expect(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).andReturn(null); - replay(metrics); + when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) + .thenReturn(metricName); + when(metrics.metric(metricName)).thenReturn(null); + when(metrics.addMetricIfAbsent(eq(metricName), eqMetricConfig(metricConfig), eq(VALUE_PROVIDER))).thenReturn(null); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.addStoreLevelMutableMetric( @@ -496,8 +466,6 @@ public void shouldAddNewStoreLevelMutableMetric() { INFO_RECORDING_LEVEL, VALUE_PROVIDER ); - - verify(metrics); } @Test @@ -516,10 +484,9 @@ public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() { final Metrics metrics = mock(Metrics.class); final MetricName metricName = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); - expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) - .andReturn(metricName); - expect(metrics.metric(metricName)).andReturn(mock(KafkaMetric.class)); - replay(metrics); + when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) + .thenReturn(metricName); + when(metrics.metric(metricName)).thenReturn(null); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.addStoreLevelMutableMetric( @@ -531,8 +498,6 @@ public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() { INFO_RECORDING_LEVEL, VALUE_PROVIDER ); - - verify(metrics); } @Test @@ -569,27 +534,23 @@ public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() throws @Test public void shouldRemoveStateStoreLevelSensors() { - final Metrics metrics = niceMock(Metrics.class); + final Metrics metrics = mock(Metrics.class); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); final MetricName metricName1 = new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); final MetricName metricName2 = new MetricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP); - expect(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) - .andReturn(metricName1); - expect(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP)) - .andReturn(metricName2); - final Capture sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); - resetToDefault(metrics); - metrics.removeSensor(sensorKeys.getValues().get(6)); - metrics.removeSensor(sensorKeys.getValues().get(7)); - expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class)); - expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class)); - replay(metrics); + when(metrics.metricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP)) + .thenReturn(metricName1); + when(metrics.metricName(METRIC_NAME2, STATE_STORE_LEVEL_GROUP, DESCRIPTION2, STORE_LEVEL_TAG_MAP)) + .thenReturn(metricName2); + final ArgumentCaptor sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); + doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(6)); + doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(7)); + when(metrics.removeMetric(metricName1)).thenReturn(null); + when(metrics.removeMetric(metricName2)).thenReturn(null); streamsMetrics.removeAllStoreLevelSensorsAndMetrics(TASK_ID1, STORE_NAME1); - - verify(metrics); } @Test @@ -607,7 +568,6 @@ public void shouldGetNewNodeLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -626,7 +586,6 @@ public void shouldGetExistingNodeLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -646,7 +605,6 @@ public void shouldGetNewCacheLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -665,7 +623,6 @@ public void shouldGetExistingCacheLevelSensor() { recordingLevel ); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -678,7 +635,6 @@ public void shouldGetNewClientLevelSensor() { final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -691,7 +647,6 @@ public void shouldGetExistingClientLevelSensor() { final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel); - verify(metrics); assertThat(actualSensor, is(equalToObject(sensor))); } @@ -702,15 +657,12 @@ public void shouldAddClientLevelImmutableMetric() { final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); final String value = "immutable-value"; final ImmutableMetricValue immutableValue = new ImmutableMetricValue<>(value); - expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) - .andReturn(metricName1); - metrics.addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue)); - replay(metrics); + when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) + .thenReturn(metricName1); + doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(immutableValue)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, value); - - verify(metrics); } @Test @@ -719,15 +671,12 @@ public void shouldAddClientLevelMutableMetric() { final RecordingLevel recordingLevel = RecordingLevel.INFO; final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); final Gauge valueProvider = (config, now) -> "mutable-value"; - expect(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) - .andReturn(metricName1); - metrics.addMetric(EasyMock.eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider)); - replay(metrics); + when(metrics.metricName(METRIC_NAME1, CLIENT_LEVEL_GROUP, DESCRIPTION1, clientLevelTags)) + .thenReturn(metricName1); + doNothing().when(metrics).addMetric(eq(metricName1), eqMetricConfig(metricConfig), eq(valueProvider)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); streamsMetrics.addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, valueProvider); - - verify(metrics); } @Test @@ -739,39 +688,31 @@ public void shouldProvideCorrectStrings() { private void setupRemoveSensorsTest(final Metrics metrics, final String level) { final String fullSensorNamePrefix = INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER; - resetToDefault(metrics); - metrics.removeSensor(fullSensorNamePrefix + SENSOR_NAME_1); - metrics.removeSensor(fullSensorNamePrefix + SENSOR_NAME_2); - replay(metrics); + doNothing().when(metrics).removeSensor(fullSensorNamePrefix + SENSOR_NAME_1); + doNothing().when(metrics).removeSensor(fullSensorNamePrefix + SENSOR_NAME_2); } @Test public void shouldRemoveClientLevelMetricsAndSensors() { - final Metrics metrics = niceMock(Metrics.class); + final Metrics metrics = mock(Metrics.class); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); - final Capture sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); - resetToDefault(metrics); - - metrics.removeSensor(sensorKeys.getValues().get(0)); - metrics.removeSensor(sensorKeys.getValues().get(1)); - expect(metrics.removeMetric(metricName1)).andStubReturn(null); - expect(metrics.removeMetric(metricName2)).andStubReturn(null); - replay(metrics); - streamsMetrics.removeAllClientLevelSensorsAndMetrics(); + final ArgumentCaptor sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); - verify(metrics); + doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(0)); + doNothing().when(metrics).removeSensor(sensorKeys.getAllValues().get(1)); + when(metrics.removeMetric(metricName1)).thenReturn(null); + when(metrics.removeMetric(metricName2)).thenReturn(null); + streamsMetrics.removeAllClientLevelSensorsAndMetrics(); } @Test public void shouldRemoveThreadLevelSensors() { - final Metrics metrics = niceMock(Metrics.class); + final Metrics metrics = mock(Metrics.class); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); addSensorsOnAllLevels(metrics, streamsMetrics); setupRemoveSensorsTest(metrics, THREAD_ID1); streamsMetrics.removeAllThreadLevelSensors(THREAD_ID1); - - verify(metrics); } @Test @@ -1144,14 +1085,11 @@ public void shouldGetThreadLevelTagMap() { @Test public void shouldAddInvocationRateToSensor() { - final Sensor sensor = createMock(Sensor.class); + final Sensor sensor = mock(Sensor.class); final MetricName expectedMetricName = new MetricName(METRIC_NAME1 + "-rate", group, DESCRIPTION1, tags); - expect(sensor.add(eq(expectedMetricName), anyObject(Rate.class))).andReturn(true); - replay(sensor); + when(sensor.add(eq(expectedMetricName), any(Rate.class))).thenReturn(true); StreamsMetricsImpl.addInvocationRateToSensor(sensor, group, tags, METRIC_NAME1, DESCRIPTION1); - - verify(sensor); } @Test @@ -1293,43 +1231,33 @@ private void verifyMetric(final String name, public void shouldMeasureLatency() { final long startTime = 6; final long endTime = 10; - final Sensor sensor = createMock(Sensor.class); - expect(sensor.shouldRecord()).andReturn(true); - expect(sensor.hasMetrics()).andReturn(true); - sensor.record(endTime - startTime); + final Sensor sensor = mock(Sensor.class); + when(sensor.shouldRecord()).thenReturn(true); + when(sensor.hasMetrics()).thenReturn(true); + doNothing().when(sensor).record(endTime - startTime); final Time time = mock(Time.class); - expect(time.nanoseconds()).andReturn(startTime); - expect(time.nanoseconds()).andReturn(endTime); - replay(sensor, time); + when(time.nanoseconds()).thenReturn(startTime).thenReturn(endTime); StreamsMetricsImpl.maybeMeasureLatency(() -> { }, time, sensor); - - verify(sensor, time); } @Test public void shouldNotMeasureLatencyDueToRecordingLevel() { - final Sensor sensor = createMock(Sensor.class); - expect(sensor.shouldRecord()).andReturn(false); + final Sensor sensor = mock(Sensor.class); + when(sensor.shouldRecord()).thenReturn(false); final Time time = mock(Time.class); - replay(sensor); StreamsMetricsImpl.maybeMeasureLatency(() -> { }, time, sensor); - - verify(sensor); } @Test public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() { - final Sensor sensor = createMock(Sensor.class); - expect(sensor.shouldRecord()).andReturn(true); - expect(sensor.hasMetrics()).andReturn(false); + final Sensor sensor = mock(Sensor.class); + when(sensor.shouldRecord()).thenReturn(true); + when(sensor.hasMetrics()).thenReturn(false); final Time time = mock(Time.class); - replay(sensor); StreamsMetricsImpl.maybeMeasureLatency(() -> { }, time, sensor); - - verify(sensor); } @Test