Skip to content

Commit

Permalink
Merge pull request apache#1 from vvcephei/upstream_merge_Feb_6
Browse files Browse the repository at this point in the history
Revert "KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429)"
  • Loading branch information
soondenana authored Feb 7, 2020
2 parents baa395a + bbf0057 commit d8a6812
Show file tree
Hide file tree
Showing 23 changed files with 142 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATE_RECORD_DROP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;

public class Sensors {
Expand All @@ -49,7 +50,13 @@ public static Sensor lateRecordDropSensor(final InternalProcessorContext context
StreamsMetricsImpl.addInvocationRateAndCountToSensor(
sensor,
PROCESSOR_NODE_METRICS_GROUP,
metrics.nodeLevelTagMap(threadId, context.taskId().toString(), context.currentNode().name()),
metrics.tagMap(
threadId,
"task-id",
context.taskId().toString(),
PROCESSOR_NODE_ID_TAG,
context.currentNode().name()
),
LATE_RECORD_DROP
);
return sensor;
Expand All @@ -66,7 +73,11 @@ public static Sensor recordLatenessSensor(final InternalProcessorContext context
Sensor.RecordingLevel.DEBUG
);

final Map<String, String> tags = metrics.taskLevelTagMap(threadId, context.taskId().toString());
final Map<String, String> tags = metrics.tagMap(
threadId,
"task-id",
context.taskId().toString()
);
sensor.add(
new MetricName(
"record-lateness-avg",
Expand Down Expand Up @@ -98,8 +109,13 @@ public static Sensor suppressionEmitSensor(final InternalProcessorContext contex
Sensor.RecordingLevel.DEBUG
);

final Map<String, String> tags =
metrics.nodeLevelTagMap(threadId, context.taskId().toString(), context.currentNode().name());
final Map<String, String> tags = metrics.tagMap(
threadId,
"task-id",
context.taskId().toString(),
PROCESSOR_NODE_ID_TAG,
context.currentNode().name()
);

sensor.add(
new MetricName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;

Expand Down Expand Up @@ -185,8 +185,20 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNode

final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString();
final Map<String, String> tagMap = metrics.nodeLevelTagMap(threadId, context.taskId().toString(), processorNodeName);
final Map<String, String> allTagMap = metrics.nodeLevelTagMap(threadId, context.taskId().toString(), ROLLUP_VALUE);
final Map<String, String> tagMap = metrics.tagMap(
threadId,
"task-id",
context.taskId().toString(),
PROCESSOR_NODE_ID_TAG,
processorNodeName
);
final Map<String, String> allTagMap = metrics.tagMap(
threadId,
"task-id",
context.taskId().toString(),
PROCESSOR_NODE_ID_TAG,
"all"
);

nodeProcessTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
"process",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ protected static final class TaskMetrics {
final Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, metrics);

// add the operation metrics with additional tags
final Map<String, String> tagMap = metrics.taskLevelTagMap(threadId, taskName);
final Map<String, String> tagMap =
metrics.tagMap(Thread.currentThread().getName(), "task-id", taskName);
taskCommitTimeSensor =
metrics.taskLevelSensor(threadId, taskName, "commit", Sensor.RecordingLevel.DEBUG, parent);
taskCommitTimeSensor.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public int hashCode() {
public static final String THREAD_ID_TAG_0100_TO_23 = "client-id";
public static final String TASK_ID_TAG = "task-id";
public static final String STORE_ID_TAG = "state-id";
public static final String BUFFER_ID_TAG = "buffer-id";
public static final String RECORD_CACHE_ID_TAG = "record-cache-id";

public static final String ROLLUP_VALUE = "all";
Expand Down Expand Up @@ -238,13 +237,22 @@ public Map<String, String> clientLevelTagMap() {

public Map<String, String> threadLevelTagMap(final String threadId) {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(version == Version.LATEST ? THREAD_ID_TAG : THREAD_ID_TAG_0100_TO_23, threadId);
tagMap.put(THREAD_ID_TAG_0100_TO_23, threadId);
return tagMap;
}

public Map<String, String> threadLevelTagMap(final String threadId, final String... tags) {
final Map<String, String> tagMap = threadLevelTagMap(threadId);
return addTags(tagMap, tags);
if (tags != null) {
if ((tags.length % 2) != 0) {
throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
}

for (int i = 0; i < tags.length; i += 2) {
tagMap.put(tags[i], tags[i + 1]);
}
}
return tagMap;
}

public final void removeAllClientLevelMetrics() {
Expand All @@ -271,15 +279,6 @@ public Map<String, String> taskLevelTagMap(final String threadId, final String t
return tagMap;
}


public Map<String, String> nodeLevelTagMap(final String threadId,
final String taskName,
final String processorNodeName) {
final Map<String, String> tagMap = taskLevelTagMap(threadId, taskName);
tagMap.put(PROCESSOR_NODE_ID_TAG, processorNodeName);
return tagMap;
}

public Map<String, String> storeLevelTagMap(final String threadId,
final String taskName,
final String storeType,
Expand All @@ -289,28 +288,6 @@ public Map<String, String> storeLevelTagMap(final String threadId,
return tagMap;
}

public Map<String, String> bufferLevelTagMap(final String threadId,
final String taskName,
final String bufferName) {
final Map<String, String> tagMap = taskLevelTagMap(threadId, taskName);
tagMap.put(BUFFER_ID_TAG, bufferName);
return tagMap;
}

private Map<String, String> addTags(final Map<String, String> tagMap,
final String... tags) {
if (tags != null) {
if ((tags.length % 2) != 0) {
throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
}

for (int i = 0; i < tags.length; i += 2) {
tagMap.put(tags[i], tags[i + 1]);
}
}
return tagMap;
}

public final Sensor taskLevelSensor(final String threadId,
final String taskId,
final String sensorName,
Expand Down Expand Up @@ -497,16 +474,33 @@ public void recordThroughput(final Sensor sensor, final long value) {
sensor.record(value);
}

public final Map<String, String> tagMap(final String threadId, final String... tags) {
final Map<String, String> tagMap = new LinkedHashMap<>();
tagMap.put(THREAD_ID_TAG_0100_TO_23, threadId);
if (tags != null) {
if ((tags.length % 2) != 0) {
throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
}

for (int i = 0; i < tags.length; i += 2) {
tagMap.put(tags[i], tags[i + 1]);
}
}
return tagMap;
}


private Map<String, String> constructTags(final String threadId,
final String scopeName,
final String entityName,
final String... tags) {
final String[] updatedTags = Arrays.copyOf(tags, tags.length + 2);
updatedTags[tags.length] = scopeName + "-id";
updatedTags[tags.length + 1] = entityName;
return threadLevelTagMap(threadId, updatedTags);
return tagMap(threadId, updatedTags);
}


/**
* @throws IllegalArgumentException if tags is not constructed in key-value pairs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void init(final ProcessorContext context,
addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.storeLevelTagMap(threadId, taskName, metricScope, name()),
metrics.tagMap(threadId, "task-id", taskName, metricScope + "-id", name()),
EXPIRED_WINDOW_RECORD_DROP
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void init(final ProcessorContext context, final StateStore root) {
addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.storeLevelTagMap(threadId, taskName, metricScope, name()),
metrics.tagMap(threadId, "task-id", taskName, metricScope + "-id", name()),
EXPIRED_WINDOW_RECORD_DROP
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void init(final ProcessorContext context, final StateStore root) {
addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.storeLevelTagMap(threadId, taskName, metricScope, name()),
metrics.tagMap(threadId, "task-id", taskName, metricScope + "-id", name()),
EXPIRED_WINDOW_RECORD_DROP
);

Expand Down Expand Up @@ -505,4 +505,4 @@ private Windowed<Bytes> getWindowedKey() {
return new Windowed<>(key, timeWindow);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,13 @@ private static Sensor getBufferSizeOrCountSensor(final StateStore store,

final String metricsGroup = "stream-buffer-metrics";

final Map<String, String> tags = metrics.bufferLevelTagMap(threadId, context.taskId().toString(), store.name());
final Map<String, String> tags = metrics.tagMap(
threadId,
"task-id",
context.taskId().toString(),
"buffer-id",
store.name()
);

sensor.add(
new MetricName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("thread-id", threadId),
mkEntry("client-id", threadId),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
Expand All @@ -434,7 +434,7 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
"stream-processor-node-metrics",
"The average number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("thread-id", threadId),
mkEntry("client-id", threadId),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
Expand Down Expand Up @@ -495,7 +495,7 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("thread-id", threadId),
mkEntry("client-id", threadId),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
Expand All @@ -508,7 +508,7 @@ public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
"stream-processor-node-metrics",
"The average number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("thread-id", threadId),
mkEntry("client-id", threadId),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ private void assertLatenessMetrics(final TopologyTestDriver driver,
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("thread-id", threadId),
mkEntry("client-id", threadId),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
)
Expand All @@ -423,7 +423,7 @@ private void assertLatenessMetrics(final TopologyTestDriver driver,
"stream-processor-node-metrics",
"The average number of occurrence of late-record-drop operations.",
mkMap(
mkEntry("thread-id", threadId),
mkEntry("client-id", threadId),
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
)
Expand All @@ -435,7 +435,7 @@ private void assertLatenessMetrics(final TopologyTestDriver driver,
"stream-task-metrics",
"The max observed lateness of records.",
mkMap(
mkEntry("thread-id", threadId),
mkEntry("client-id", threadId),
mkEntry("task-id", "0_0")
)
);
Expand All @@ -446,7 +446,7 @@ private void assertLatenessMetrics(final TopologyTestDriver driver,
"stream-task-metrics",
"The average observed lateness of records.",
mkMap(
mkEntry("thread-id", threadId),
mkEntry("client-id", threadId),
mkEntry("task-id", "0_0")
)
);
Expand Down
Loading

0 comments on commit d8a6812

Please sign in to comment.