diff --git a/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java b/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java index ac48029bd52..f15ba3c4a37 100644 --- a/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java +++ b/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java @@ -24,26 +24,41 @@ import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol; import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.springframework.util.Assert; - +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; @Slf4j public class KafkaCollectImpl extends AbstractCollect { + private static final String LAG_NUM = "lag_num"; + private static final String PARTITION_OFFSET = "Partition_offset"; + @Override public void preCheck(Metrics metrics) throws IllegalArgumentException { KafkaProtocol kafkaProtocol = metrics.getKclient(); @@ -79,6 +94,9 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri case TOPIC_OFFSET: collectTopicOffset(builder, adminClient); break; + case CONSUMER_DETAIL: + collectTopicConsumerGroups(builder, adminClient); + break; default: log.error("Unsupported command: {}", command); break; @@ -203,6 +221,98 @@ private static void collectTopicDescribe(CollectRep.MetricsData.Builder builder, }); } + /** + * Collect Topic ConsumerGroups Message + * + * @param builder The MetricsData builder + * @param adminClient The AdminClient + */ + private static void collectTopicConsumerGroups(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException { + ListTopicsOptions options = new ListTopicsOptions(); + options.listInternal(true); + // Get all consumer groups + ListConsumerGroupsResult consumerGroupsResult = adminClient.listConsumerGroups(); + Collection consumerGroups = consumerGroupsResult.all().get(); + // Get the list of consumer groups for each topic + Map> topicConsumerGroupsMap = getTopicConsumerGroupsMap(consumerGroups, adminClient); + topicConsumerGroupsMap.entrySet().stream() + .flatMap(entry -> entry.getValue().stream() + .map(groupId -> { + try { + String topic = entry.getKey(); + DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Collections.singletonList(groupId)); + Map consumerGroupDescriptions = describeResult.all().get(); + ConsumerGroupDescription description = consumerGroupDescriptions.get(groupId); + Map offsetAndLagNum = getConsumerGroupMetrics(topic, groupId, adminClient); + return CollectRep.ValueRow.newBuilder() + .addColumns(groupId) + .addColumns(String.valueOf(description.members().size())) + .addColumns(topic) + .addColumns(offsetAndLagNum.get(PARTITION_OFFSET)) + .addColumns(offsetAndLagNum.get(LAG_NUM)) + .build(); + } catch (InterruptedException | ExecutionException e) { + log.warn("group {} get message fail", groupId); + return null; + } + }) + ) + .filter(Objects::nonNull) + .forEach(builder::addValues); + } + + private static Map> getTopicConsumerGroupsMap(Collection consumerGroups, + AdminClient adminClient) + throws ExecutionException, InterruptedException { + Map> topicConsumerGroupsMap = new HashMap<>(); + for (ConsumerGroupListing consumerGroup : consumerGroups) { + String groupId = consumerGroup.groupId(); + // Get the offset information for the consumer group + ListConsumerGroupOffsetsResult consumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId); + Map topicOffsets = consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); + // Iterate over all TopicPartitions consumed by the consumer group + for (Map.Entry entry : topicOffsets.entrySet()) { + String topic = entry.getKey().topic(); + topicConsumerGroupsMap.computeIfAbsent(topic, k -> new HashSet<>()).add(groupId); + } + } + return topicConsumerGroupsMap; + } + + private static Map getConsumerGroupMetrics(String topic, String groupId, AdminClient adminClient) + throws ExecutionException, InterruptedException { + // Get the offset for each groupId for the specified topic + ListConsumerGroupOffsetsResult consumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId); + Map topicOffsets = consumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(); + long totalLag = 0L; + for (Entry topicPartitionOffsetAndMetadataEntry : topicOffsets.entrySet()) { + if (topicPartitionOffsetAndMetadataEntry.getKey().topic().equals(topic)) { + OffsetAndMetadata offsetMetadata = topicPartitionOffsetAndMetadataEntry.getValue(); + TopicPartition partition = topicPartitionOffsetAndMetadataEntry.getKey(); + // Get the latest offset for each TopicPartition + ListOffsetsResultInfo resultInfo = adminClient.listOffsets( + Collections.singletonMap(partition, OffsetSpec.latest())).all().get().get(partition); + long latestOffset = resultInfo.offset(); + // Accumulate the lag for each partition + long l = latestOffset - offsetMetadata.offset(); + totalLag += l; + } + } + // Get all offsets and convert them to a string, joined by "、" + String partitionOffsets = topicOffsets.entrySet().stream() + .filter(entry -> entry.getKey().topic().equals(topic)) + .map(entry -> String.valueOf(entry.getValue().offset())) + .collect(Collectors.collectingAndThen( + Collectors.joining(","), + result -> "[" + result + "]" + )); + Map res = new HashMap<>(); + res.put(LAG_NUM, String.valueOf(totalLag)); + res.put(PARTITION_OFFSET, partitionOffsets); + return res; + } + + @Override public String supportProtocol() { return DispatchConstants.PROTOCOL_KAFKA; diff --git a/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java b/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java index 911a3529dc1..050cde24f70 100644 --- a/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java +++ b/hertzbeat-collector/hertzbeat-collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java @@ -27,7 +27,8 @@ public enum SupportedCommand { TOPIC_DESCRIBE("topic-describe"), TOPIC_LIST("topic-list"), - TOPIC_OFFSET("topic-offset"); + TOPIC_OFFSET("topic-offset"), + CONSUMER_DETAIL("consumer-detail"); private static Set SUPPORTED_COMMAND = new HashSet<>(); diff --git a/hertzbeat-manager/src/main/resources/define/app-kafka_client.yml b/hertzbeat-manager/src/main/resources/define/app-kafka_client.yml index 46ab6fb241e..cb388d088b3 100644 --- a/hertzbeat-manager/src/main/resources/define/app-kafka_client.yml +++ b/hertzbeat-manager/src/main/resources/define/app-kafka_client.yml @@ -123,7 +123,7 @@ metrics: - field: PartitionNum type: 1 i18n: - zh-CN: 分区数量 + zh-CN: 分区号 en-US: Partition Num - field: earliest type: 0 @@ -140,4 +140,41 @@ metrics: host: ^_^host^_^ port: ^_^port^_^ command: topic-offset - + - name: consumer_detail + i18n: + zh-CN: 消费者组情况 + en-US: Consumer Detail Info + priority: 3 + # Kafka offset does not need to be obtained frequently, as getting it too quickly will affect performance + interval: 300 + fields: + - field: GroupId + type: 1 + i18n: + zh-CN: 消费者组ID + en-US: Consumer Group ID + - field: Group Member Num + type: 1 + i18n: + zh-CN: 消费者实例数量 + en-US: Group Member Num + - field: Topic + type: 1 + i18n: + zh-CN: 订阅主题名称 + en-US: Subscribed Topic Name + - field: Offset of Each Partition + type: 1 + i18n: + zh-CN: 各分区偏移量 + en-US: Offset of Each Partition + - field: Lag + type: 0 + i18n: + zh-CN: 落后偏移量 + en-US: Total Lag + protocol: kclient + kclient: + host: ^_^host^_^ + port: ^_^port^_^ + command: consumer-detail \ No newline at end of file diff --git a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md index 7010174a357..f883044734b 100644 --- a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md +++ b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md @@ -39,9 +39,19 @@ keywords: [开源监控系统, 开源消息中间件监控, Kafka监控] #### 指标集合:topic_offset -| 指标名称 | 指标单位 | 指标帮助描述 | -|-------|---|---------| -| TopicName | 无 | 主题名称 | -| PartitionNum | 无 | 分区数量 | -| earliest | 无 | 最早偏移量 | -| latest | 无 | 最新偏移量 | +| 指标名称 | 指标单位 | 指标帮助描述 | +|-------|---|--------| +| TopicName | 无 | 主题名称 | +| PartitionNum | 无 | 分区号 | +| earliest | 无 | 最早偏移量 | +| latest | 无 | 最新偏移量 | + +#### 指标集合:consumer_detail + +| 指标名称 | 指标单位 | 指标帮助描述 | +|-----------|------|-------| +| GroupId | 无 | 消费者组ID | +| Group Member Num | 无 | 消费者实例数量| +| Subscribed Topic Name | 无 | 订阅主题名称 | +| Offsets of Each Partition | 无 | 各分区偏移量 | +| Lag | 无 | 落后偏移量 |