From b5698b571a7ef97ca19425607e6ee8f691e5d182 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 26 Feb 2020 19:13:25 +0100 Subject: [PATCH 1/2] MINOR: Refactor process rate and latency metrics on thread-level --- .../processor/internals/StreamThread.java | 2 +- .../internals/metrics/ThreadMetrics.java | 15 ++-- .../internals/metrics/ThreadMetricsTest.java | 68 ++++++++----------- 3 files changed, 35 insertions(+), 50 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 4dcad01c3143d..bd8049150bec7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -642,7 +642,7 @@ public StreamThread(final Time time, this.commitSensor = ThreadMetrics.commitSensor(threadId, streamsMetrics); this.pollSensor = ThreadMetrics.pollSensor(threadId, streamsMetrics); this.processLatencySensor = ThreadMetrics.processLatencySensor(threadId, streamsMetrics); - this.processRateSensor = ThreadMetrics.processRateSensor(threadId, streamsMetrics); + this.processRateSensor = ThreadMetrics.processSensor(threadId, streamsMetrics); this.punctuateSensor = ThreadMetrics.punctuateSensor(threadId, streamsMetrics); // The following sensors are created here but their references are not stored in this object, since within diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index be813bed2de1b..59181438fa974 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -24,7 +24,6 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; @@ -150,9 +149,8 @@ public static Sensor pollSensor(final String threadId, public static Sensor processLatencySensor(final String threadId, final StreamsMetricsImpl streamsMetrics) { - final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, - PROCESS + LATENCY_SUFFIX, - RecordingLevel.INFO); + final Sensor sensor = + streamsMetrics.threadLevelSensor(threadId, PROCESS + LATENCY_SUFFIX, RecordingLevel.INFO); final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); final String threadLevelGroup = threadLevelGroup(streamsMetrics); addAvgAndMaxToSensor( @@ -166,11 +164,10 @@ public static Sensor processLatencySensor(final String threadId, return sensor; } - public static Sensor processRateSensor(final String threadId, - final StreamsMetricsImpl streamsMetrics) { - final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, - PROCESS + RATE_SUFFIX, - RecordingLevel.INFO); + public static Sensor processSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + final Sensor sensor = + streamsMetrics.threadLevelSensor(threadId, PROCESS, RecordingLevel.INFO); final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); final String threadLevelGroup = threadLevelGroup(streamsMetrics); addRateOfSumAndSumMetricsToSensor( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java index 417965226333b..625f1c0b4b087 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals.metrics; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; @@ -35,9 +34,8 @@ import java.util.Collections; import java.util.Map; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.mock; import static org.hamcrest.CoreMatchers.is; @@ -78,6 +76,7 @@ public static Collection data() { @Before public void setUp() { expect(streamsMetrics.version()).andReturn(builtInMetricsVersion).anyTimes(); + mockStatic(StreamsMetricsImpl.class); } @Test @@ -87,7 +86,6 @@ public void shouldGetCreateTaskSensor() { final String rateDescription = "The average per-second number of newly created tasks"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -111,7 +109,6 @@ public void shouldGetCloseTaskSensor() { final String rateDescription = "The average per-second number of closed tasks"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -138,7 +135,6 @@ public void shouldGetCommitSensor() { final String maxLatencyDescription = "The maximum commit latency"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -172,7 +168,6 @@ public void shouldGetPollSensor() { final String maxLatencyDescription = "The maximum poll latency"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -199,22 +194,20 @@ public void shouldGetPollSensor() { @Test public void shouldGetProcessLatencySensor() { - expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-latency", RecordingLevel.INFO)) - .andReturn(expectedSensor); + final String operation = "process"; + final String operationLatency = operation + LATENCY_SUFFIX; + final String avgLatencyDescription = "The average process latency"; + final String maxLatencyDescription = "The maximum process latency"; + expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - expect(expectedSensor.add(eq(new MetricName( - "process-latency-avg", - threadLevelGroup, - "The average execution time in ms for processing, across all running tasks of this thread.", - tagMap - )), anyObject())).andReturn(true); - - expect(expectedSensor.add(eq(new MetricName( - "process-latency-max", + StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, threadLevelGroup, - "The maximum execution time in ms for processing across all running tasks of this thread.", - tagMap - )), anyObject())).andReturn(true); + tagMap, + operationLatency, + avgLatencyDescription, + maxLatencyDescription + ); replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor); final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics); @@ -225,25 +218,22 @@ public void shouldGetProcessLatencySensor() { @Test public void shouldGetProcessRateSensor() { - expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-rate", RecordingLevel.INFO)) - .andReturn(expectedSensor); + final String operation = "process"; + final String totalDescription = "The total number of calls to process"; + final String rateDescription = "The average per-second number of calls to process"; + expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - expect(expectedSensor.add(eq(new MetricName( - "process-rate", - threadLevelGroup, - "The average per-second number of calls to process", - tagMap - )), anyObject())).andReturn(true); - - expect(expectedSensor.add(eq(new MetricName( - "process-total", + StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor( + expectedSensor, threadLevelGroup, - "The total number of calls to process", - tagMap - )), anyObject())).andReturn(true); + tagMap, + operation, + rateDescription, + totalDescription + ); replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor); - final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics); + final Sensor sensor = ThreadMetrics.processSensor(THREAD_ID, streamsMetrics); verify(StreamsMetricsImpl.class, streamsMetrics); assertThat(sensor, is(expectedSensor)); @@ -259,13 +249,13 @@ public void shouldGetPunctuateSensor() { final String maxLatencyDescription = "The maximum punctuate latency"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, tagMap, operation, - rateDescription, totalDescription + rateDescription, + totalDescription ); StreamsMetricsImpl.addAvgAndMaxToSensor( expectedSensor, @@ -291,7 +281,6 @@ public void shouldGetSkipRecordSensor() { expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)) .andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -322,7 +311,6 @@ public void shouldGetCommitOverTasksSensor() { "The maximum commit latency over all tasks assigned to one stream thread"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).andReturn(expectedSensor); expect(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).andReturn(tagMap); - mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, TASK_LEVEL_GROUP, From 34a117d91588b0bc8929713e681870c352942834 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 26 Feb 2020 21:47:01 +0100 Subject: [PATCH 2/2] Rename sensor that measures rate and amount of records processed --- .../kafka/streams/processor/internals/StreamThread.java | 2 +- .../processor/internals/metrics/ThreadMetrics.java | 7 ++++--- .../processor/internals/metrics/ThreadMetricsTest.java | 9 +++++---- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index bd8049150bec7..4dcad01c3143d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -642,7 +642,7 @@ public StreamThread(final Time time, this.commitSensor = ThreadMetrics.commitSensor(threadId, streamsMetrics); this.pollSensor = ThreadMetrics.pollSensor(threadId, streamsMetrics); this.processLatencySensor = ThreadMetrics.processLatencySensor(threadId, streamsMetrics); - this.processRateSensor = ThreadMetrics.processSensor(threadId, streamsMetrics); + this.processRateSensor = ThreadMetrics.processRateSensor(threadId, streamsMetrics); this.punctuateSensor = ThreadMetrics.punctuateSensor(threadId, streamsMetrics); // The following sensors are created here but their references are not stored in this object, since within diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index 59181438fa974..d87856b95a8b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -24,6 +24,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; @@ -164,10 +165,10 @@ public static Sensor processLatencySensor(final String threadId, return sensor; } - public static Sensor processSensor(final String threadId, - final StreamsMetricsImpl streamsMetrics) { + public static Sensor processRateSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { final Sensor sensor = - streamsMetrics.threadLevelSensor(threadId, PROCESS, RecordingLevel.INFO); + streamsMetrics.threadLevelSensor(threadId, PROCESS + RATE_SUFFIX, RecordingLevel.INFO); final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); final String threadLevelGroup = threadLevelGroup(streamsMetrics); addRateOfSumAndSumMetricsToSensor( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java index 625f1c0b4b087..d7059a87fd886 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -35,6 +35,7 @@ import java.util.Map; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.mock; @@ -194,8 +195,7 @@ public void shouldGetPollSensor() { @Test public void shouldGetProcessLatencySensor() { - final String operation = "process"; - final String operationLatency = operation + LATENCY_SUFFIX; + final String operationLatency = "process" + LATENCY_SUFFIX; final String avgLatencyDescription = "The average process latency"; final String maxLatencyDescription = "The maximum process latency"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, RecordingLevel.INFO)).andReturn(expectedSensor); @@ -219,9 +219,10 @@ public void shouldGetProcessLatencySensor() { @Test public void shouldGetProcessRateSensor() { final String operation = "process"; + final String operationRate = "process" + RATE_SUFFIX; final String totalDescription = "The total number of calls to process"; final String rateDescription = "The average per-second number of calls to process"; - expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); + expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationRate, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor( expectedSensor, @@ -233,7 +234,7 @@ public void shouldGetProcessRateSensor() { ); replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor); - final Sensor sensor = ThreadMetrics.processSensor(THREAD_ID, streamsMetrics); + final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics); verify(StreamsMetricsImpl.class, streamsMetrics); assertThat(sensor, is(expectedSensor));