Skip to content

Commit

Permalink
KAFKA-17231 Add missing node latency metrics (#17137)
Browse files Browse the repository at this point in the history
This is the equivalent of #16755 for the share group consumer.

The node request-latency-max and request-latency-avg were not being recorded and thus reported as NaN for the share group consumer.

Reviewers: Apoorv Mittal <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
AndrewJSchofield authored Sep 11, 2024
1 parent 60707a5 commit 0c4ffc6
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ private void handleShareFetchSuccess(Node fetchTarget,
}
}

metricsManager.recordLatency(resp.requestLatencyMs());
metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
Expand Down Expand Up @@ -629,8 +629,6 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
if (!closeFuture.isDone()) {
closeFuture.complete(null);
}

metricsManager.recordLatency(resp.requestLatencyMs());
} else {
if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) {
// Received a response-level error code.
Expand All @@ -646,7 +644,6 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));

acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
metricsManager.recordLatency(resp.requestLatencyMs());
}));
acknowledgeRequestState.processingComplete();
}
Expand Down Expand Up @@ -678,9 +675,10 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
}
acknowledgeRequestState.processingComplete();
metricsManager.recordLatency(resp.requestLatencyMs());
}
}

metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metrics.stats.WindowedCount;

public class ShareFetchMetricsManager {
private final Metrics metrics;
private final Sensor throttleTime;
private final Sensor bytesFetched;
private final Sensor recordsFetched;
Expand All @@ -29,6 +30,8 @@ public class ShareFetchMetricsManager {
private final Sensor failedAcknowledgements;

public ShareFetchMetricsManager(Metrics metrics, ShareFetchMetricsRegistry metricsRegistry) {
this.metrics = metrics;

this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched")
.withAvg(metricsRegistry.fetchSizeAvg)
.withMax(metricsRegistry.fetchSizeMax)
Expand Down Expand Up @@ -64,8 +67,14 @@ public Sensor throttleTimeSensor() {
return throttleTime;
}

void recordLatency(long requestLatencyMs) {
void recordLatency(String node, long requestLatencyMs) {
fetchLatency.record(requestLatencyMs);
if (!node.isEmpty()) {
String nodeTimeName = "node-" + node + ".latency";
Sensor nodeRequestTime = metrics.getSensor(nodeTimeName);
if (nodeRequestTime != null)
nodeRequestTime.record(requestLatencyMs);
}
}

void recordBytesFetched(int bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;

Expand Down Expand Up @@ -55,14 +59,41 @@ public void tearDown() {

@Test
public void testLatency() {
shareFetchMetricsManager.recordLatency(101);
shareFetchMetricsManager.recordLatency("", 101);
time.sleep(metrics.config().timeWindowMs() + 1);
shareFetchMetricsManager.recordLatency(155);
shareFetchMetricsManager.recordLatency("", 155);

assertEquals(155, (double) getMetric(shareFetchMetricsRegistry.fetchLatencyMax).metricValue(), EPSILON);
assertEquals(128, (double) getMetric(shareFetchMetricsRegistry.fetchLatencyAvg).metricValue(), EPSILON);
}

@Test
public void testNodeLatency() {
String connectionId = "0";
MetricName nodeLatencyAvg = metrics.metricName("request-latency-avg", "group");
MetricName nodeLatencyMax = metrics.metricName("request-latency-max", "group");
registerNodeLatencyMetric(connectionId, nodeLatencyAvg, nodeLatencyMax);

shareFetchMetricsManager.recordLatency(connectionId, 123);
time.sleep(metrics.config().timeWindowMs() + 1);
shareFetchMetricsManager.recordLatency(connectionId, 456);

assertEquals(289.5, metricValue(shareFetchMetricsRegistry.fetchLatencyAvg), EPSILON);
assertEquals(456, metricValue(shareFetchMetricsRegistry.fetchLatencyMax), EPSILON);

assertEquals(289.5, metricValue(nodeLatencyAvg), EPSILON);
assertEquals(456, metricValue(nodeLatencyMax), EPSILON);

// Record metric against another node.
shareFetchMetricsManager.recordLatency("1", 501);

assertEquals(360, metricValue(shareFetchMetricsRegistry.fetchLatencyAvg), EPSILON);
assertEquals(501, metricValue(shareFetchMetricsRegistry.fetchLatencyMax), EPSILON);
// Node specific metric should not be affected.
assertEquals(289.5, metricValue(nodeLatencyAvg), EPSILON);
assertEquals(456, metricValue(nodeLatencyMax), EPSILON);
}

@Test
public void testBytesFetched() {
shareFetchMetricsManager.recordBytesFetched(2);
Expand All @@ -86,4 +117,21 @@ public void testRecordsFetched() {
private KafkaMetric getMetric(MetricNameTemplate name) {
return metrics.metric(metrics.metricInstance(name));
}

private void registerNodeLatencyMetric(String connectionId, MetricName nodeLatencyAvg, MetricName nodeLatencyMax) {
String nodeTimeName = "node-" + connectionId + ".latency";
Sensor nodeRequestTime = metrics.sensor(nodeTimeName);
nodeRequestTime.add(nodeLatencyAvg, new Avg());
nodeRequestTime.add(nodeLatencyMax, new Max());
}

private double metricValue(MetricNameTemplate name) {
MetricName metricName = metrics.metricInstance(name);
return metricValue(metricName);
}

private double metricValue(MetricName metricName) {
KafkaMetric metric = metrics.metric(metricName);
return (Double) metric.metricValue();
}
}

0 comments on commit 0c4ffc6

Please sign in to comment.