Skip to content

Commit

Permalink
[feature] support kafka admin monitor (apache#2733)
Browse files Browse the repository at this point in the history
Co-authored-by: shown <[email protected]>
Co-authored-by: tomsun28 <[email protected]>
  • Loading branch information
3 people authored Sep 22, 2024
1 parent 5fa88ec commit ec195c1
Show file tree
Hide file tree
Showing 14 changed files with 823 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,9 @@ public interface DispatchConstants {
String PARSE_PROM_QL = "PromQL";
String PARSE_PROM_QL_VECTOR = "vector";
String PARSE_PROM_QL_MATRIX = "matrix";

/**
* protocol kafka
*/
String PROTOCOL_KAFKA = "kclient";
}
50 changes: 50 additions & 0 deletions collector/collector-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-collector</artifactId>
<version>2.0-SNAPSHOT</version>
</parent>

<artifactId>hertzbeat-collector-kafka</artifactId>
<name>${project.artifactId}</name>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-collector-common</artifactId>
<scope>provided</scope>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.collect.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.collect.AbstractCollect;
import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.util.Assert;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Slf4j
public class KafkaCollectImpl extends AbstractCollect {

@Override
public void preCheck(Metrics metrics) throws IllegalArgumentException {
KafkaProtocol kafkaProtocol = metrics.getKclient();
// Ensure that metrics and kafkaProtocol are not null
Assert.isTrue(metrics != null && kafkaProtocol != null, "Kafka collect must have kafkaProtocol params");
// Ensure that host and port are not empty
Assert.hasText(kafkaProtocol.getHost(), "Kafka Protocol host is required.");
Assert.hasText(kafkaProtocol.getPort(), "Kafka Protocol port is required.");
}

@Override
public void collect(CollectRep.MetricsData.Builder builder, long monitorId, String app, Metrics metrics) {
try {
KafkaProtocol kafkaProtocol = metrics.getKclient();
String command = kafkaProtocol.getCommand();
boolean isKafkaCommand = SupportedCommand.isKafkaCommand(command);
if (!isKafkaCommand) {
log.error("Unsupported command: {}", command);
return;
}

// Create AdminClient with the provided host and port
AdminClient adminClient = KafkaConnect.getAdminClient(kafkaProtocol.getHost() + ":" + kafkaProtocol.getPort());

// Execute the appropriate collection method based on the command
switch (SupportedCommand.fromCommand(command)) {
case TOPIC_DESCRIBE:
collectTopicDescribe(builder, adminClient);
break;
case TOPIC_LIST:
collectTopicList(builder, adminClient);
break;
case TOPIC_OFFSET:
collectTopicOffset(builder, adminClient);
break;
default:
log.error("Unsupported command: {}", command);
break;
}
} catch (InterruptedException | ExecutionException e) {
log.error("Kafka collect error", e);
}
}

/**
* Collect the earliest and latest offsets for each topic
*
* @param builder The MetricsData builder
* @param adminClient The AdminClient
* @throws InterruptedException If the thread is interrupted
* @throws ExecutionException If an error occurs during execution
*/
private void collectTopicOffset(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException {
ListTopicsResult listTopicsResult = adminClient.listTopics(new ListTopicsOptions().listInternal(true));
Set<String> names = listTopicsResult.names().get();
names.forEach(name -> {
try {
Map<String, TopicDescription> map = adminClient.describeTopics(Collections.singleton(name)).all().get(3L, TimeUnit.SECONDS);
map.forEach((key, value) -> value.partitions().forEach(info -> extractedOffset(builder, adminClient, name, value, info)));
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.warn("Topic {} get offset fail", name);
}
});
}

private void extractedOffset(CollectRep.MetricsData.Builder builder, AdminClient adminClient, String name, TopicDescription value, TopicPartitionInfo info) {
try {
TopicPartition topicPartition = new TopicPartition(value.name(), info.partition());
long earliestOffset = getEarliestOffset(adminClient, topicPartition);
long latestOffset = getLatestOffset(adminClient, topicPartition);
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
valueRowBuilder.addColumns(value.name());
valueRowBuilder.addColumns(String.valueOf(info.partition()));
valueRowBuilder.addColumns(String.valueOf(earliestOffset));
valueRowBuilder.addColumns(String.valueOf(latestOffset));
builder.addValues(valueRowBuilder.build());
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.warn("Topic {} get offset fail", name);
}
}

/**
* Get the earliest offset for a given topic partition
*
* @param adminClient The AdminClient
* @param topicPartition The TopicPartition
* @return The earliest offset
*/
private long getEarliestOffset(AdminClient adminClient, TopicPartition topicPartition)
throws InterruptedException, ExecutionException, TimeoutException {
return adminClient
.listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.earliest()))
.all()
.get(3L, TimeUnit.SECONDS)
.get(topicPartition)
.offset();
}

/**
* Get the latest offset for a given topic partition
*
* @param adminClient The AdminClient
* @param topicPartition The TopicPartition
* @return The latest offset
*/
private long getLatestOffset(AdminClient adminClient, TopicPartition topicPartition)
throws InterruptedException, ExecutionException, TimeoutException {
return adminClient
.listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.latest()))
.all()
.get(3L, TimeUnit.SECONDS)
.get(topicPartition)
.offset();
}

/**
* Collect the list of topics
*
* @param builder The MetricsData builder
* @param adminClient The AdminClient
*/
private static void collectTopicList(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException {
ListTopicsOptions options = new ListTopicsOptions().listInternal(true);
Set<String> names = adminClient.listTopics(options).names().get();
names.forEach(name -> {
CollectRep.ValueRow valueRow = CollectRep.ValueRow.newBuilder().addColumns(name).build();
builder.addValues(valueRow);
});
}

/**
* Collect the description of each topic
*
* @param builder The MetricsData builder
* @param adminClient The AdminClient
*/
private static void collectTopicDescribe(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
Set<String> names = listTopicsResult.names().get();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(names);
Map<String, TopicDescription> map = describeTopicsResult.all().get();
map.forEach((key, value) -> {
List<TopicPartitionInfo> listp = value.partitions();
listp.forEach(info -> {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
valueRowBuilder.addColumns(value.name());
valueRowBuilder.addColumns(String.valueOf(value.partitions().size()));
valueRowBuilder.addColumns(String.valueOf(info.partition()));
valueRowBuilder.addColumns(info.leader().host());
valueRowBuilder.addColumns(String.valueOf(info.leader().port()));
valueRowBuilder.addColumns(String.valueOf(info.replicas().size()));
valueRowBuilder.addColumns(String.valueOf(info.replicas()));
builder.addValues(valueRowBuilder.build());
});
});
}

@Override
public String supportProtocol() {
return DispatchConstants.PROTOCOL_KAFKA;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.collector.collect.kafka;

import org.apache.hertzbeat.collector.collect.common.cache.AbstractConnection;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import java.util.Properties;

/**
* Kafka connection
*/
public class KafkaConnect extends AbstractConnection<AdminClient> {


private static AdminClient adminClient;

public KafkaConnect(String brokerList) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.RETRIES_CONFIG, 3);
properties.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 500);
adminClient = KafkaAdminClient.create(properties);
}

@Override
public AdminClient getConnection() {
return adminClient;
}

@Override
public void closeConnection() throws Exception {
if (this.adminClient != null) {
this.adminClient.close();
}
}

public static synchronized AdminClient getAdminClient(String brokerList) {
if (adminClient == null) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
adminClient = KafkaAdminClient.create(properties);
}
return adminClient;
}

}
Loading

0 comments on commit ec195c1

Please sign in to comment.