Skip to content

Commit

Permalink
extend kafka dev ui
Browse files Browse the repository at this point in the history
  • Loading branch information
Nidum committed Aug 3, 2022
1 parent b8a3d07 commit b9347b1
Show file tree
Hide file tree
Showing 29 changed files with 2,139 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package io.quarkus.kafka.client.runtime;

import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest;
import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand Down Expand Up @@ -52,6 +48,14 @@ public Collection<TopicListing> getTopics() throws InterruptedException, Executi
return client.listTopics().listings().get();
}

public Collection<ConsumerGroupDescription> getConsumerGroups() throws InterruptedException, ExecutionException {
var consumerGroupIds = client.listConsumerGroups().all().get().stream()
.map(ConsumerGroupListing::groupId)
.collect(Collectors.toList());
return client.describeConsumerGroups(consumerGroupIds).all().get()
.values();
}

public boolean deleteTopic(String name) {
Collection<String> topics = new ArrayList<>();
topics.add(name);
Expand All @@ -68,6 +72,10 @@ public boolean createTopic(KafkaCreateTopicRequest kafkaCreateTopicRq) {
return ctr.values() != null;
}

public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
return client.listConsumerGroupOffsets(groupId);
}

public Collection<AclBinding> getAclInfo() throws InterruptedException, ExecutionException {
AclBindingFilter filter = new AclBindingFilter(ResourcePatternFilter.ANY, AccessControlEntryFilter.ANY);
var options = new DescribeAclsOptions().timeoutMs(1_000);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,48 @@
package io.quarkus.kafka.client.runtime.ui;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static io.quarkus.kafka.client.runtime.ui.util.ConsumerFactory.createConsumer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.inject.Singleton;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;

import io.quarkus.kafka.client.runtime.ui.model.Order;
import io.quarkus.kafka.client.runtime.ui.model.converter.KafkaModelConverter;
import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest;
import io.quarkus.kafka.client.runtime.ui.model.response.KafkaMessagePage;
import io.smallrye.common.annotation.Identifier;

@Singleton
@ApplicationScoped
public class KafkaTopicClient {
// TODO: make configurable
private static final int RETRIES = 3;

//TODO: inject me
private AdminClient adminClient;

KafkaModelConverter modelConverter = new KafkaModelConverter();

@Inject
@Identifier("default-kafka-broker")
Map<String, Object> config;
Expand All @@ -32,6 +54,210 @@ void init() {
adminClient = AdminClient.create(conf);
}

private Producer<Bytes, Bytes> createProducer() {
Map<String, Object> config = new HashMap<>(this.config);

config.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-ui-producer-" + UUID.randomUUID());
// TODO: make generic to support AVRO serializer
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName());

return new KafkaProducer<>(config);
}

/**
* Reads the messages from particular topic. Offset for next page is returned within response.
* The first/last page offset could be retrieved with
* {@link KafkaTopicClient#getPagePartitionOffset(String, Collection, Order)}
* method.
*
* @param topicName topic to read messages from
* @param order ascending or descending. Defaults to descending (newest first)
* @param partitionOffsets the offset for page to be read
* @param pageSize size of read page
* @return page of messages, matching requested filters
*/
public KafkaMessagePage getTopicMessages(
String topicName,
Order order,
Map<Integer, Long> partitionOffsets,
int pageSize)
throws ExecutionException, InterruptedException {
assertParamsValid(pageSize, partitionOffsets);

var requestedPartitions = partitionOffsets.keySet();
assertRequestedPartitionsExist(topicName, requestedPartitions);
if (order == null)
order = Order.OLD_FIRST;

var allPartitionsResult = getConsumerRecords(topicName, order, pageSize, requestedPartitions, partitionOffsets,
pageSize);

Comparator<ConsumerRecord<Bytes, Bytes>> comparator = Comparator.comparing(ConsumerRecord::timestamp);
if (Order.NEW_FIRST == order)
comparator = comparator.reversed();
allPartitionsResult.sort(comparator);

// We might have too many values. Throw away newer items, which don't fit into page.
if (allPartitionsResult.size() > pageSize) {
allPartitionsResult = allPartitionsResult.subList(0, pageSize);
}

var newOffsets = calculateNewPartitionOffset(partitionOffsets, allPartitionsResult, order, topicName);
var convertedResult = allPartitionsResult.stream()
.map(modelConverter::convert)
.collect(Collectors.toList());
return new KafkaMessagePage(newOffsets, convertedResult);
}

// Fail fast on wrong params, even before querying Kafka.
private void assertParamsValid(int pageSize, Map<Integer, Long> partitionOffsets) {
if (pageSize <= 0)
throw new IllegalArgumentException("Page size must be > 0.");

if (partitionOffsets == null || partitionOffsets.isEmpty())
throw new IllegalArgumentException("Partition offset map must be specified.");

for (var partitionOffset : partitionOffsets.entrySet()) {
if (partitionOffset.getValue() < 0)
throw new IllegalArgumentException(
"Partition offset must be > 0.");
}
}

private ConsumerRecords<Bytes, Bytes> pollWhenReady(Consumer<Bytes, Bytes> consumer) {
var attempts = 0;
var pullDuration = Duration.of(100, ChronoUnit.MILLIS);
var result = consumer.poll(pullDuration);

while (result.isEmpty() && attempts < RETRIES) {
result = consumer.poll(pullDuration);
attempts++;
}
return result;
}

/*
* FIXME: should consider compaction strategy, when our new offset not necessary = old + total records read, but some
* records might be deleted, so we'll end up seeing duplicates on some pages.
* Imagine this case:
* - page size = 10
* - 30 messages pushed, value is incremental 1 ... 30.
* - message 10 gets removed, as message 15 has same key because of compaction
* - we request page 1. it had offset 0. we return values [1, 2, 3, ..., 9, 11], total of 10. We get new offset for page 2 =
* 0 + totalRecords = 10.
* - we request page 2. we read starting from offset = 10. There is no message with that offset, but we see message 11 again
* instead.
*/
private Map<Integer, Long> calculateNewPartitionOffset(Map<Integer, Long> oldPartitionOffset,
Collection<ConsumerRecord<Bytes, Bytes>> records, Order order, String topicName) {
var newOffsets = records.stream().map(ConsumerRecord::partition)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

var newPartitionOffset = new HashMap<Integer, Long>();
for (var partition : oldPartitionOffset.keySet()) {
// We should add in case we seek for oldest and reduce for newest.
var multiplier = Order.OLD_FIRST == order ? 1 : -1;

// If new offset for partition is not there in the map - we didn't have records for that partition. So, just take the old offset.
var newOffset = oldPartitionOffset.get(partition) + multiplier * newOffsets.getOrDefault(partition, 0L);
newPartitionOffset.put(partition, newOffset);
}
return newPartitionOffset;
}

private long getPosition(String topicName, int partition, Order order) {
try (var consumer = createConsumer(topicName, partition, this.config)) {
var topicPartition = new TopicPartition(topicName, partition);
if (Order.NEW_FIRST == order) {
consumer.seekToEnd(List.of(topicPartition));
} else {
consumer.seekToBeginning(List.of(topicPartition));
}
return consumer.position(topicPartition);
}
}

public Map<Integer, Long> getPagePartitionOffset(String topicName, Collection<Integer> requestedPartitions, Order order)
throws ExecutionException, InterruptedException {
assertRequestedPartitionsExist(topicName, requestedPartitions);

var result = new HashMap<Integer, Long>();
for (var requestedPartition : requestedPartitions) {
var maxPosition = getPosition(topicName, requestedPartition, order);
result.put(requestedPartition, maxPosition);
}

return result;
}

private List<ConsumerRecord<Bytes, Bytes>> getConsumerRecords(String topicName, Order order, int pageSize,
Collection<Integer> requestedPartitions, Map<Integer, Long> start, int totalMessages) {
List<ConsumerRecord<Bytes, Bytes>> allPartitionsResult = new ArrayList<>();

// Requesting a full page from each partition and then filtering out redundant data. Thus, we'll ensure, we read data in historical order.
for (var requestedPartition : requestedPartitions) {
List<ConsumerRecord<Bytes, Bytes>> partitionResult = new ArrayList<>();
var offset = start.get(requestedPartition);
try (var consumer = createConsumer(topicName, requestedPartition, this.config)) {
// Move pointer to currently read position. It might be different per partition, so requesting with offset per partition.
var partition = new TopicPartition(topicName, requestedPartition);

var seekedOffset = Order.OLD_FIRST == order ? offset : Long.max(offset - pageSize, 0);
consumer.seek(partition, seekedOffset);

var numberOfMessagesReadSoFar = 0;
var keepOnReading = true;

while (keepOnReading) {
var records = pollWhenReady(consumer);
if (records.isEmpty())
keepOnReading = false;

for (var record : records) {
numberOfMessagesReadSoFar++;
partitionResult.add(record);

if (numberOfMessagesReadSoFar >= totalMessages) {
keepOnReading = false;
break;
}
}
}
// We need to cut off result, if it was reset to 0, as we don't want see entries from old pages.
if (Order.NEW_FIRST == order && seekedOffset == 0 && partitionResult.size() > offset.intValue()) {
partitionResult.sort(Comparator.comparing(ConsumerRecord::timestamp));
partitionResult = partitionResult.subList(0, offset.intValue());
}

}
allPartitionsResult.addAll(partitionResult);
}
return allPartitionsResult;
}

private void assertRequestedPartitionsExist(String topicName, Collection<Integer> requestedPartitions)
throws InterruptedException, ExecutionException {
var topicPartitions = partitions(topicName);

if (!new HashSet<>(topicPartitions).containsAll(requestedPartitions)) {
throw new IllegalArgumentException(String.format(
"Requested messages from partition, that do not exist. Requested partitions: %s. Existing partitions: %s",
requestedPartitions, topicPartitions));
}
}

public void createMessage(KafkaMessageCreateRequest request) {
var record = new ProducerRecord<>(request.getTopic(), request.getPartition(), Bytes.wrap(request.getKey().getBytes()),
Bytes.wrap(request.getValue().getBytes())
//TODO: support headers
);

try (var producer = createProducer()) {
producer.send(record);
}
}

public List<Integer> partitions(String topicName) throws ExecutionException, InterruptedException {
return adminClient.describeTopics(List.of(topicName))
.allTopicNames()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import io.quarkus.arc.Arc;
import io.quarkus.kafka.client.runtime.KafkaAdminClient;
import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest;
import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest;
import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessagesRequest;
import io.quarkus.kafka.client.runtime.ui.model.request.KafkaOffsetRequest;
import io.quarkus.security.identity.CurrentIdentityAssociation;
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
import io.vertx.core.http.HttpServerRequest;
Expand Down Expand Up @@ -66,6 +69,27 @@ public void handlePost(RoutingContext event) {
message = webUtils.toJson(webUtils.getTopics());
res = true;
break;
case "topicMessages":
var msgRequest = event.body().asPojo(KafkaMessagesRequest.class);
message = webUtils.toJson(webUtils.getMessages(msgRequest));
res = true;
break;
case "getOffset":
var request = event.body().asPojo(KafkaOffsetRequest.class);
message = webUtils.toJson(webUtils.getOffset(request));
res = true;
break;
case "createMessage":
var rq = event.body().asPojo(KafkaMessageCreateRequest.class);
webUtils.createMessage(rq);
message = "{}";
res = true;
break;
case "getPartitions":
var topicName = body.getString("topicName");
message = webUtils.toJson(webUtils.partitions(topicName));
res = true;
break;
default:
break;
}
Expand Down
Loading

0 comments on commit b9347b1

Please sign in to comment.