Skip to content

Commit

Permalink
MINOR: Clean up process rate and latency metrics test (apache#8172)
Browse files Browse the repository at this point in the history
Reviewers: John Roesler <[email protected]>
  • Loading branch information
cadonna authored Mar 2, 2020
1 parent 75268cb commit ea0c027
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ public static Sensor pollSensor(final String threadId,

public static Sensor processLatencySensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.threadLevelSensor(threadId,
PROCESS + LATENCY_SUFFIX,
RecordingLevel.INFO);
final Sensor sensor =
streamsMetrics.threadLevelSensor(threadId, PROCESS + LATENCY_SUFFIX, RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
final String threadLevelGroup = threadLevelGroup(streamsMetrics);
addAvgAndMaxToSensor(
Expand All @@ -167,10 +166,9 @@ public static Sensor processLatencySensor(final String threadId,
}

public static Sensor processRateSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.threadLevelSensor(threadId,
PROCESS + RATE_SUFFIX,
RecordingLevel.INFO);
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor =
streamsMetrics.threadLevelSensor(threadId, PROCESS + RATE_SUFFIX, RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
final String threadLevelGroup = threadLevelGroup(streamsMetrics);
addRateOfSumAndSumMetricsToSensor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.metrics;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
Expand All @@ -35,9 +34,9 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -78,6 +77,7 @@ public static Collection<Object[]> data() {
@Before
public void setUp() {
expect(streamsMetrics.version()).andReturn(builtInMetricsVersion).anyTimes();
mockStatic(StreamsMetricsImpl.class);
}

@Test
Expand All @@ -87,7 +87,6 @@ public void shouldGetCreateTaskSensor() {
final String rateDescription = "The average per-second number of newly created tasks";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand All @@ -111,7 +110,6 @@ public void shouldGetCloseTaskSensor() {
final String rateDescription = "The average per-second number of closed tasks";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand All @@ -138,7 +136,6 @@ public void shouldGetCommitSensor() {
final String maxLatencyDescription = "The maximum commit latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand Down Expand Up @@ -172,7 +169,6 @@ public void shouldGetPollSensor() {
final String maxLatencyDescription = "The maximum poll latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand All @@ -199,22 +195,19 @@ public void shouldGetPollSensor() {

@Test
public void shouldGetProcessLatencySensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-latency", RecordingLevel.INFO))
.andReturn(expectedSensor);
final String operationLatency = "process" + LATENCY_SUFFIX;
final String avgLatencyDescription = "The average process latency";
final String maxLatencyDescription = "The maximum process latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
expect(expectedSensor.add(eq(new MetricName(
"process-latency-avg",
threadLevelGroup,
"The average execution time in ms for processing, across all running tasks of this thread.",
tagMap
)), anyObject())).andReturn(true);

expect(expectedSensor.add(eq(new MetricName(
"process-latency-max",
StreamsMetricsImpl.addAvgAndMaxToSensor(
expectedSensor,
threadLevelGroup,
"The maximum execution time in ms for processing across all running tasks of this thread.",
tagMap
)), anyObject())).andReturn(true);
tagMap,
operationLatency,
avgLatencyDescription,
maxLatencyDescription
);
replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor);

final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics);
Expand All @@ -225,22 +218,20 @@ public void shouldGetProcessLatencySensor() {

@Test
public void shouldGetProcessRateSensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-rate", RecordingLevel.INFO))
.andReturn(expectedSensor);
final String operation = "process";
final String operationRate = "process" + RATE_SUFFIX;
final String totalDescription = "The total number of calls to process";
final String rateDescription = "The average per-second number of calls to process";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationRate, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
expect(expectedSensor.add(eq(new MetricName(
"process-rate",
threadLevelGroup,
"The average per-second number of calls to process",
tagMap
)), anyObject())).andReturn(true);

expect(expectedSensor.add(eq(new MetricName(
"process-total",
StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor(
expectedSensor,
threadLevelGroup,
"The total number of calls to process",
tagMap
)), anyObject())).andReturn(true);
tagMap,
operation,
rateDescription,
totalDescription
);
replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor);

final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics);
Expand All @@ -259,13 +250,13 @@ public void shouldGetPunctuateSensor() {
final String maxLatencyDescription = "The maximum punctuate latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
tagMap,
operation,
rateDescription, totalDescription
rateDescription,
totalDescription
);
StreamsMetricsImpl.addAvgAndMaxToSensor(
expectedSensor,
Expand All @@ -291,7 +282,6 @@ public void shouldGetSkipRecordSensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO))
.andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand Down Expand Up @@ -322,7 +312,6 @@ public void shouldGetCommitOverTasksSensor() {
"The maximum commit latency over all tasks assigned to one stream thread";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).andReturn(expectedSensor);
expect(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
TASK_LEVEL_GROUP,
Expand Down

0 comments on commit ea0c027

Please sign in to comment.