Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Refactor process rate and latency metrics on thread-level #8172

Merged
merged 2 commits into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ public static Sensor pollSensor(final String threadId,

public static Sensor processLatencySensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.threadLevelSensor(threadId,
PROCESS + LATENCY_SUFFIX,
RecordingLevel.INFO);
final Sensor sensor =
streamsMetrics.threadLevelSensor(threadId, PROCESS + LATENCY_SUFFIX, RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
final String threadLevelGroup = threadLevelGroup(streamsMetrics);
addAvgAndMaxToSensor(
Expand All @@ -167,10 +166,9 @@ public static Sensor processLatencySensor(final String threadId,
}

public static Sensor processRateSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.threadLevelSensor(threadId,
PROCESS + RATE_SUFFIX,
RecordingLevel.INFO);
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor =
streamsMetrics.threadLevelSensor(threadId, PROCESS + RATE_SUFFIX, RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
final String threadLevelGroup = threadLevelGroup(streamsMetrics);
addRateOfSumAndSumMetricsToSensor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.metrics;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
Expand All @@ -35,9 +34,9 @@
import java.util.Collections;
import java.util.Map;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -78,6 +77,7 @@ public static Collection<Object[]> data() {
@Before
public void setUp() {
expect(streamsMetrics.version()).andReturn(builtInMetricsVersion).anyTimes();
mockStatic(StreamsMetricsImpl.class);
}

@Test
Expand All @@ -87,7 +87,6 @@ public void shouldGetCreateTaskSensor() {
final String rateDescription = "The average per-second number of newly created tasks";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand All @@ -111,7 +110,6 @@ public void shouldGetCloseTaskSensor() {
final String rateDescription = "The average per-second number of closed tasks";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand All @@ -138,7 +136,6 @@ public void shouldGetCommitSensor() {
final String maxLatencyDescription = "The maximum commit latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand Down Expand Up @@ -172,7 +169,6 @@ public void shouldGetPollSensor() {
final String maxLatencyDescription = "The maximum poll latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand All @@ -199,22 +195,19 @@ public void shouldGetPollSensor() {

@Test
public void shouldGetProcessLatencySensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-latency", RecordingLevel.INFO))
.andReturn(expectedSensor);
final String operationLatency = "process" + LATENCY_SUFFIX;
final String avgLatencyDescription = "The average process latency";
final String maxLatencyDescription = "The maximum process latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
expect(expectedSensor.add(eq(new MetricName(
"process-latency-avg",
threadLevelGroup,
"The average execution time in ms for processing, across all running tasks of this thread.",
tagMap
)), anyObject())).andReturn(true);

expect(expectedSensor.add(eq(new MetricName(
"process-latency-max",
StreamsMetricsImpl.addAvgAndMaxToSensor(
expectedSensor,
threadLevelGroup,
"The maximum execution time in ms for processing across all running tasks of this thread.",
tagMap
)), anyObject())).andReturn(true);
tagMap,
operationLatency,
avgLatencyDescription,
maxLatencyDescription
);
replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor);

final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics);
Expand All @@ -225,22 +218,20 @@ public void shouldGetProcessLatencySensor() {

@Test
public void shouldGetProcessRateSensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-rate", RecordingLevel.INFO))
.andReturn(expectedSensor);
final String operation = "process";
final String operationRate = "process" + RATE_SUFFIX;
final String totalDescription = "The total number of calls to process";
final String rateDescription = "The average per-second number of calls to process";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationRate, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
expect(expectedSensor.add(eq(new MetricName(
"process-rate",
threadLevelGroup,
"The average per-second number of calls to process",
tagMap
)), anyObject())).andReturn(true);

expect(expectedSensor.add(eq(new MetricName(
"process-total",
StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor(
expectedSensor,
threadLevelGroup,
"The total number of calls to process",
tagMap
)), anyObject())).andReturn(true);
tagMap,
operation,
rateDescription,
totalDescription
);
replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor);

final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics);
Expand All @@ -259,13 +250,13 @@ public void shouldGetPunctuateSensor() {
final String maxLatencyDescription = "The maximum punctuate latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
tagMap,
operation,
rateDescription, totalDescription
rateDescription,
totalDescription
);
StreamsMetricsImpl.addAvgAndMaxToSensor(
expectedSensor,
Expand All @@ -291,7 +282,6 @@ public void shouldGetSkipRecordSensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO))
.andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
threadLevelGroup,
Expand Down Expand Up @@ -322,7 +312,6 @@ public void shouldGetCommitOverTasksSensor() {
"The maximum commit latency over all tasks assigned to one stream thread";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).andReturn(expectedSensor);
expect(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
TASK_LEVEL_GROUP,
Expand Down