diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java index 78f68171bab4f4..c9b75dc1d00c05 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java @@ -1,12 +1,8 @@ package io.quarkus.kafka.client.runtime; -import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest; -import io.smallrye.common.annotation.Identifier; -import org.apache.kafka.clients.admin.*; -import org.apache.kafka.common.acl.AccessControlEntryFilter; -import org.apache.kafka.common.acl.AclBinding; -import org.apache.kafka.common.acl.AclBindingFilter; -import org.apache.kafka.common.resource.ResourcePatternFilter; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -52,6 +48,14 @@ public Collection getTopics() throws InterruptedException, Executi return client.listTopics().listings().get(); } + public Collection getConsumerGroups() throws InterruptedException, ExecutionException { + var consumerGroupIds = client.listConsumerGroups().all().get().stream() + .map(ConsumerGroupListing::groupId) + .collect(Collectors.toList()); + return client.describeConsumerGroups(consumerGroupIds).all().get() + .values(); + } + public boolean deleteTopic(String name) { Collection topics = new ArrayList<>(); topics.add(name); @@ -68,6 +72,10 @@ public boolean createTopic(KafkaCreateTopicRequest kafkaCreateTopicRq) { return ctr.values() != null; } + public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { + return client.listConsumerGroupOffsets(groupId); + } + public Collection getAclInfo() throws InterruptedException, ExecutionException { AclBindingFilter filter = new AclBindingFilter(ResourcePatternFilter.ANY, AccessControlEntryFilter.ANY); var options = new DescribeAclsOptions().timeoutMs(1_000); diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java index a02d7601857337..174ef04aa08b8f 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java @@ -1,26 +1,48 @@ package io.quarkus.kafka.client.runtime.ui; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import static io.quarkus.kafka.client.runtime.ui.util.ConsumerFactory.createConsumer; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.PostConstruct; +import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; -import javax.inject.Singleton; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.utils.Bytes; +import io.quarkus.kafka.client.runtime.ui.model.Order; +import io.quarkus.kafka.client.runtime.ui.model.converter.KafkaModelConverter; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest; +import io.quarkus.kafka.client.runtime.ui.model.response.KafkaMessagePage; import io.smallrye.common.annotation.Identifier; -@Singleton +@ApplicationScoped public class KafkaTopicClient { + // TODO: make configurable + private static final int RETRIES = 3; + //TODO: inject me private AdminClient adminClient; + KafkaModelConverter modelConverter = new KafkaModelConverter(); + @Inject @Identifier("default-kafka-broker") Map config; @@ -32,6 +54,210 @@ void init() { adminClient = AdminClient.create(conf); } + private Producer createProducer() { + Map config = new HashMap<>(this.config); + + config.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-ui-producer-" + UUID.randomUUID()); + // TODO: make generic to support AVRO serializer + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName()); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName()); + + return new KafkaProducer<>(config); + } + + /** + * Reads the messages from particular topic. Offset for next page is returned within response. + * The first/last page offset could be retrieved with + * {@link KafkaTopicClient#getPagePartitionOffset(String, Collection, Order)} + * method. + * + * @param topicName topic to read messages from + * @param order ascending or descending. Defaults to descending (newest first) + * @param partitionOffsets the offset for page to be read + * @param pageSize size of read page + * @return page of messages, matching requested filters + */ + public KafkaMessagePage getTopicMessages( + String topicName, + Order order, + Map partitionOffsets, + int pageSize) + throws ExecutionException, InterruptedException { + assertParamsValid(pageSize, partitionOffsets); + + var requestedPartitions = partitionOffsets.keySet(); + assertRequestedPartitionsExist(topicName, requestedPartitions); + if (order == null) + order = Order.OLD_FIRST; + + var allPartitionsResult = getConsumerRecords(topicName, order, pageSize, requestedPartitions, partitionOffsets, + pageSize); + + Comparator> comparator = Comparator.comparing(ConsumerRecord::timestamp); + if (Order.NEW_FIRST == order) + comparator = comparator.reversed(); + allPartitionsResult.sort(comparator); + + // We might have too many values. Throw away newer items, which don't fit into page. + if (allPartitionsResult.size() > pageSize) { + allPartitionsResult = allPartitionsResult.subList(0, pageSize); + } + + var newOffsets = calculateNewPartitionOffset(partitionOffsets, allPartitionsResult, order, topicName); + var convertedResult = allPartitionsResult.stream() + .map(modelConverter::convert) + .collect(Collectors.toList()); + return new KafkaMessagePage(newOffsets, convertedResult); + } + + // Fail fast on wrong params, even before querying Kafka. + private void assertParamsValid(int pageSize, Map partitionOffsets) { + if (pageSize <= 0) + throw new IllegalArgumentException("Page size must be > 0."); + + if (partitionOffsets == null || partitionOffsets.isEmpty()) + throw new IllegalArgumentException("Partition offset map must be specified."); + + for (var partitionOffset : partitionOffsets.entrySet()) { + if (partitionOffset.getValue() < 0) + throw new IllegalArgumentException( + "Partition offset must be > 0."); + } + } + + private ConsumerRecords pollWhenReady(Consumer consumer) { + var attempts = 0; + var pullDuration = Duration.of(100, ChronoUnit.MILLIS); + var result = consumer.poll(pullDuration); + + while (result.isEmpty() && attempts < RETRIES) { + result = consumer.poll(pullDuration); + attempts++; + } + return result; + } + + /* + * FIXME: should consider compaction strategy, when our new offset not necessary = old + total records read, but some + * records might be deleted, so we'll end up seeing duplicates on some pages. + * Imagine this case: + * - page size = 10 + * - 30 messages pushed, value is incremental 1 ... 30. + * - message 10 gets removed, as message 15 has same key because of compaction + * - we request page 1. it had offset 0. we return values [1, 2, 3, ..., 9, 11], total of 10. We get new offset for page 2 = + * 0 + totalRecords = 10. + * - we request page 2. we read starting from offset = 10. There is no message with that offset, but we see message 11 again + * instead. + */ + private Map calculateNewPartitionOffset(Map oldPartitionOffset, + Collection> records, Order order, String topicName) { + var newOffsets = records.stream().map(ConsumerRecord::partition) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + var newPartitionOffset = new HashMap(); + for (var partition : oldPartitionOffset.keySet()) { + // We should add in case we seek for oldest and reduce for newest. + var multiplier = Order.OLD_FIRST == order ? 1 : -1; + + // If new offset for partition is not there in the map - we didn't have records for that partition. So, just take the old offset. + var newOffset = oldPartitionOffset.get(partition) + multiplier * newOffsets.getOrDefault(partition, 0L); + newPartitionOffset.put(partition, newOffset); + } + return newPartitionOffset; + } + + private long getPosition(String topicName, int partition, Order order) { + try (var consumer = createConsumer(topicName, partition, this.config)) { + var topicPartition = new TopicPartition(topicName, partition); + if (Order.NEW_FIRST == order) { + consumer.seekToEnd(List.of(topicPartition)); + } else { + consumer.seekToBeginning(List.of(topicPartition)); + } + return consumer.position(topicPartition); + } + } + + public Map getPagePartitionOffset(String topicName, Collection requestedPartitions, Order order) + throws ExecutionException, InterruptedException { + assertRequestedPartitionsExist(topicName, requestedPartitions); + + var result = new HashMap(); + for (var requestedPartition : requestedPartitions) { + var maxPosition = getPosition(topicName, requestedPartition, order); + result.put(requestedPartition, maxPosition); + } + + return result; + } + + private List> getConsumerRecords(String topicName, Order order, int pageSize, + Collection requestedPartitions, Map start, int totalMessages) { + List> allPartitionsResult = new ArrayList<>(); + + // Requesting a full page from each partition and then filtering out redundant data. Thus, we'll ensure, we read data in historical order. + for (var requestedPartition : requestedPartitions) { + List> partitionResult = new ArrayList<>(); + var offset = start.get(requestedPartition); + try (var consumer = createConsumer(topicName, requestedPartition, this.config)) { + // Move pointer to currently read position. It might be different per partition, so requesting with offset per partition. + var partition = new TopicPartition(topicName, requestedPartition); + + var seekedOffset = Order.OLD_FIRST == order ? offset : Long.max(offset - pageSize, 0); + consumer.seek(partition, seekedOffset); + + var numberOfMessagesReadSoFar = 0; + var keepOnReading = true; + + while (keepOnReading) { + var records = pollWhenReady(consumer); + if (records.isEmpty()) + keepOnReading = false; + + for (var record : records) { + numberOfMessagesReadSoFar++; + partitionResult.add(record); + + if (numberOfMessagesReadSoFar >= totalMessages) { + keepOnReading = false; + break; + } + } + } + // We need to cut off result, if it was reset to 0, as we don't want see entries from old pages. + if (Order.NEW_FIRST == order && seekedOffset == 0 && partitionResult.size() > offset.intValue()) { + partitionResult.sort(Comparator.comparing(ConsumerRecord::timestamp)); + partitionResult = partitionResult.subList(0, offset.intValue()); + } + + } + allPartitionsResult.addAll(partitionResult); + } + return allPartitionsResult; + } + + private void assertRequestedPartitionsExist(String topicName, Collection requestedPartitions) + throws InterruptedException, ExecutionException { + var topicPartitions = partitions(topicName); + + if (!new HashSet<>(topicPartitions).containsAll(requestedPartitions)) { + throw new IllegalArgumentException(String.format( + "Requested messages from partition, that do not exist. Requested partitions: %s. Existing partitions: %s", + requestedPartitions, topicPartitions)); + } + } + + public void createMessage(KafkaMessageCreateRequest request) { + var record = new ProducerRecord<>(request.getTopic(), request.getPartition(), Bytes.wrap(request.getKey().getBytes()), + Bytes.wrap(request.getValue().getBytes()) + //TODO: support headers + ); + + try (var producer = createProducer()) { + producer.send(record); + } + } + public List partitions(String topicName) throws ExecutionException, InterruptedException { return adminClient.describeTopics(List.of(topicName)) .allTopicNames() diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java index 54f18f49a61ab6..99d5b9d550813b 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java @@ -10,6 +10,9 @@ import io.quarkus.arc.Arc; import io.quarkus.kafka.client.runtime.KafkaAdminClient; import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessagesRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaOffsetRequest; import io.quarkus.security.identity.CurrentIdentityAssociation; import io.quarkus.vertx.http.runtime.CurrentVertxRequest; import io.vertx.core.http.HttpServerRequest; @@ -66,6 +69,27 @@ public void handlePost(RoutingContext event) { message = webUtils.toJson(webUtils.getTopics()); res = true; break; + case "topicMessages": + var msgRequest = event.body().asPojo(KafkaMessagesRequest.class); + message = webUtils.toJson(webUtils.getMessages(msgRequest)); + res = true; + break; + case "getOffset": + var request = event.body().asPojo(KafkaOffsetRequest.class); + message = webUtils.toJson(webUtils.getOffset(request)); + res = true; + break; + case "createMessage": + var rq = event.body().asPojo(KafkaMessageCreateRequest.class); + webUtils.createMessage(rq); + message = "{}"; + res = true; + break; + case "getPartitions": + var topicName = body.getString("topicName"); + message = webUtils.toJson(webUtils.partitions(topicName)); + res = true; + break; default: break; } diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java index 31ac51173e07e0..862fdcfeb2d2d2 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java @@ -1,24 +1,31 @@ package io.quarkus.kafka.client.runtime.ui; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import static io.quarkus.kafka.client.runtime.ui.util.ConsumerFactory.createConsumer; + +import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import javax.inject.Singleton; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.quarkus.kafka.client.runtime.KafkaAdminClient; -import io.quarkus.kafka.client.runtime.ui.model.response.KafkaClusterInfo; -import io.quarkus.kafka.client.runtime.ui.model.response.KafkaInfo; -import io.quarkus.kafka.client.runtime.ui.model.response.KafkaNode; -import io.quarkus.kafka.client.runtime.ui.model.response.KafkaTopic; +import io.quarkus.kafka.client.runtime.ui.model.Order; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessagesRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaOffsetRequest; +import io.quarkus.kafka.client.runtime.ui.model.response.*; +import io.smallrye.common.annotation.Identifier; @Singleton public class KafkaUiUtils { @@ -26,20 +33,24 @@ public class KafkaUiUtils { private final KafkaAdminClient kafkaAdminClient; private final KafkaTopicClient kafkaTopicClient; - private final ObjectMapper objectMapper; - public KafkaUiUtils(KafkaAdminClient kafkaAdminClient, KafkaTopicClient kafkaTopicClient, ObjectMapper objectMapper) { + private final Map config; + + public KafkaUiUtils(KafkaAdminClient kafkaAdminClient, KafkaTopicClient kafkaTopicClient, ObjectMapper objectMapper, + @Identifier("default-kafka-broker") Map config) { this.kafkaAdminClient = kafkaAdminClient; this.kafkaTopicClient = kafkaTopicClient; this.objectMapper = objectMapper; + this.config = config; } public KafkaInfo getKafkaInfo() throws ExecutionException, InterruptedException { var clusterInfo = getClusterInfo(); var broker = clusterInfo.getController().asFullNodeName(); var topics = getTopics(); - return new KafkaInfo(broker, clusterInfo, topics); + var consumerGroups = getConsumerGroups(); + return new KafkaInfo(broker, clusterInfo, topics, consumerGroups); } public KafkaClusterInfo getClusterInfo() throws ExecutionException, InterruptedException { @@ -91,13 +102,93 @@ private KafkaTopic kafkaTopic(TopicListing tl) throws ExecutionException, Interr tl.name(), tl.topicId().toString(), partitions.size(), - tl.isInternal()); + tl.isInternal(), + getTopicMessageCount(tl.name(), partitions)); + } + + public long getTopicMessageCount(String topicName, Collection partitions) + throws ExecutionException, InterruptedException { + var maxPartitionOffsetMap = kafkaTopicClient.getPagePartitionOffset(topicName, partitions, Order.NEW_FIRST); + return maxPartitionOffsetMap.values().stream() + .reduce(Long::sum) + .orElse(0L); } public Collection partitions(String topicName) throws ExecutionException, InterruptedException { return kafkaTopicClient.partitions(topicName); } + public KafkaMessagePage getMessages(KafkaMessagesRequest request) throws ExecutionException, InterruptedException { + return kafkaTopicClient.getTopicMessages(request.getTopicName(), request.getOrder(), request.getPartitionOffset(), + request.getPageSize()); + } + + public void createMessage(KafkaMessageCreateRequest request) { + kafkaTopicClient.createMessage(request); + } + + public List getConsumerGroups() throws InterruptedException, ExecutionException { + List res = new ArrayList<>(); + for (ConsumerGroupDescription cgd : kafkaAdminClient.getConsumerGroups()) { + + var metadata = kafkaAdminClient.listConsumerGroupOffsets(cgd.groupId()) + .partitionsToOffsetAndMetadata().get(); + var members = cgd.members().stream() + .map(member -> new KafkaConsumerGroupMember( + member.consumerId(), + member.clientId(), + member.host(), + getPartitionAssignments(metadata, member))) + .collect(Collectors.toSet()); + + res.add(new KafkaConsumerGroup( + cgd.groupId(), + cgd.state().name(), + cgd.coordinator().host(), + cgd.coordinator().id(), + cgd.partitionAssignor(), + getTotalLag(members), + members)); + } + return res; + } + + private long getTotalLag(Set members) { + return members.stream() + .map(KafkaConsumerGroupMember::getPartitions) + .flatMap(Collection::stream) + .map(KafkaConsumerGroupMemberPartitionAssignment::getLag) + .reduce(Long::sum) + .orElse(0L); + } + + private Set getPartitionAssignments( + Map topicOffsetMap, MemberDescription member) { + var topicPartitions = member.assignment().topicPartitions(); + try (var consumer = createConsumer(topicPartitions, config)) { + var endOffsets = consumer.endOffsets(topicPartitions); + + return topicPartitions.stream() + .map(tp -> { + var topicOffset = Optional.ofNullable(topicOffsetMap.get(tp)) + .map(OffsetAndMetadata::offset) + .orElse(0L); + return new KafkaConsumerGroupMemberPartitionAssignment(tp.partition(), tp.topic(), + getLag(topicOffset, endOffsets.get(tp))); + }) + .collect(Collectors.toSet()); + } + } + + private long getLag(long topicOffset, long endOffset) { + return endOffset - topicOffset; + } + + public Map getOffset(KafkaOffsetRequest request) throws ExecutionException, InterruptedException { + return kafkaTopicClient.getPagePartitionOffset(request.getTopicName(), request.getRequestedPartitions(), + request.getOrder()); + } + public KafkaAclInfo getAclInfo() throws InterruptedException, ExecutionException { var clusterInfo = clusterInfo(kafkaAdminClient.getCluster()); var entries = new ArrayList(); diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/Order.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/Order.java new file mode 100644 index 00000000000000..a94a5565c4a0f0 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/Order.java @@ -0,0 +1,6 @@ +package io.quarkus.kafka.client.runtime.ui.model; + +public enum Order { + OLD_FIRST, + NEW_FIRST +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/converter/KafkaModelConverter.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/converter/KafkaModelConverter.java new file mode 100644 index 00000000000000..5eaad0173d1294 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/converter/KafkaModelConverter.java @@ -0,0 +1,20 @@ +package io.quarkus.kafka.client.runtime.ui.model.converter; + +import java.util.Optional; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; + +import io.quarkus.kafka.client.runtime.ui.model.response.KafkaMessage; + +public class KafkaModelConverter { + public KafkaMessage convert(ConsumerRecord message) { + return new KafkaMessage( + message.topic(), + message.partition(), + message.offset(), + message.timestamp(), + Optional.ofNullable(message.key()).map(Bytes::toString).orElse(null), + Optional.ofNullable(message.value()).map(Bytes::toString).orElse(null)); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessageCreateRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessageCreateRequest.java new file mode 100644 index 00000000000000..5dcbebb32fdaac --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessageCreateRequest.java @@ -0,0 +1,39 @@ +package io.quarkus.kafka.client.runtime.ui.model.request; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties("action") +public class KafkaMessageCreateRequest { + + //TODO: add headers + private String topic; + private Integer partition; + private String value; + private String key; + + public KafkaMessageCreateRequest() { + } + + public KafkaMessageCreateRequest(String topic, Integer partition, String value, String key) { + this.topic = topic; + this.partition = partition; + this.value = value; + this.key = key; + } + + public String getTopic() { + return topic; + } + + public Integer getPartition() { + return partition; + } + + public String getValue() { + return value; + } + + public String getKey() { + return key; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessagesRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessagesRequest.java new file mode 100644 index 00000000000000..71fda0e79d8a65 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessagesRequest.java @@ -0,0 +1,51 @@ +package io.quarkus.kafka.client.runtime.ui.model.request; + +import java.util.Map; + +import io.quarkus.kafka.client.runtime.ui.model.Order; + +public class KafkaMessagesRequest { + private String topicName; + private Order order; + private int pageSize; + private Integer pageNumber; + + private Map partitionOffset; + + public KafkaMessagesRequest() { + } + + public KafkaMessagesRequest(String topicName, Order order, int pageSize, int pageNumber) { + this.topicName = topicName; + this.order = order; + this.pageSize = pageSize; + this.pageNumber = pageNumber; + } + + public KafkaMessagesRequest(String topicName, Order order, int pageSize, Map partitionOffset) { + this.topicName = topicName; + this.order = order; + this.pageSize = pageSize; + this.partitionOffset = partitionOffset; + } + + public String getTopicName() { + return topicName; + } + + public Order getOrder() { + return order; + } + + public int getPageSize() { + return pageSize; + } + + public int getPageNumber() { + return pageNumber; + } + + public Map getPartitionOffset() { + return partitionOffset; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaOffsetRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaOffsetRequest.java new file mode 100644 index 00000000000000..f9fa52cdb73699 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaOffsetRequest.java @@ -0,0 +1,32 @@ +package io.quarkus.kafka.client.runtime.ui.model.request; + +import java.util.List; + +import io.quarkus.kafka.client.runtime.ui.model.Order; + +public class KafkaOffsetRequest { + private String topicName; + private List requestedPartitions; + private Order order; + + public KafkaOffsetRequest() { + } + + public KafkaOffsetRequest(String topicName, List requestedPartitions, Order order) { + this.topicName = topicName; + this.requestedPartitions = requestedPartitions; + this.order = order; + } + + public String getTopicName() { + return topicName; + } + + public List getRequestedPartitions() { + return requestedPartitions; + } + + public Order getOrder() { + return order; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroup.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroup.java new file mode 100644 index 00000000000000..e6506837534ebd --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroup.java @@ -0,0 +1,56 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.Collection; + +public class KafkaConsumerGroup { + private String name; + private String state; + private String coordinatorHost; + private int coordinatorId; + // The assignment strategy + private String protocol; + private long lag; + private Collection members; + + public KafkaConsumerGroup() { + } + + public KafkaConsumerGroup(String name, String state, String coordinatorHost, int coordinatorId, String protocol, long lag, + Collection members) { + this.name = name; + this.state = state; + this.coordinatorHost = coordinatorHost; + this.coordinatorId = coordinatorId; + this.protocol = protocol; + this.lag = lag; + this.members = members; + } + + public String getName() { + return name; + } + + public String getState() { + return state; + } + + public String getCoordinatorHost() { + return coordinatorHost; + } + + public int getCoordinatorId() { + return coordinatorId; + } + + public String getProtocol() { + return protocol; + } + + public long getLag() { + return lag; + } + + public Collection getMembers() { + return members; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMember.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMember.java new file mode 100644 index 00000000000000..338890414b702f --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMember.java @@ -0,0 +1,38 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.Collection; + +public class KafkaConsumerGroupMember { + private String memberId; + private String clientId; + private String host; + + private Collection partitions; + + public KafkaConsumerGroupMember() { + } + + public KafkaConsumerGroupMember(String memberId, String clientId, String host, + Collection partitions) { + this.memberId = memberId; + this.clientId = clientId; + this.host = host; + this.partitions = partitions; + } + + public String getMemberId() { + return memberId; + } + + public String getClientId() { + return clientId; + } + + public String getHost() { + return host; + } + + public Collection getPartitions() { + return partitions; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMemberPartitionAssignment.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMemberPartitionAssignment.java new file mode 100644 index 00000000000000..4a722e76d6385d --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMemberPartitionAssignment.java @@ -0,0 +1,29 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaConsumerGroupMemberPartitionAssignment { + + private int partition; + private String topic; + private long lag; + + public KafkaConsumerGroupMemberPartitionAssignment() { + } + + public KafkaConsumerGroupMemberPartitionAssignment(int partition, String topic, long lag) { + this.partition = partition; + this.topic = topic; + this.lag = lag; + } + + public int getPartition() { + return partition; + } + + public String getTopic() { + return topic; + } + + public long getLag() { + return lag; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java index d095170b8bdf8c..f8a63d09638f55 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java @@ -6,14 +6,17 @@ public class KafkaInfo { private String broker; private KafkaClusterInfo clusterInfo; private List topics; + private List consumerGroups; public KafkaInfo() { } - public KafkaInfo(String broker, KafkaClusterInfo clusterInfo, List topics) { + public KafkaInfo(String broker, KafkaClusterInfo clusterInfo, List topics, + List consumerGroups) { this.broker = broker; this.clusterInfo = clusterInfo; this.topics = topics; + this.consumerGroups = consumerGroups; } public String getBroker() { @@ -28,4 +31,7 @@ public KafkaClusterInfo getClusterInfo() { return clusterInfo; } + public List getConsumerGroups() { + return consumerGroups; + } } diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessage.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessage.java new file mode 100644 index 00000000000000..4b4e246994a94d --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessage.java @@ -0,0 +1,43 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaMessage { + private final String topic; + private final int partition; + private final long offset; + private final long timestamp; + private final String key; + private final String value; + + public KafkaMessage(String topic, int partition, long offset, long timestamp, String key, String value) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.timestamp = timestamp; + this.key = key; + this.value = value; + } + + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; + } + + public long getOffset() { + return offset; + } + + public long getTimestamp() { + return timestamp; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessagePage.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessagePage.java new file mode 100644 index 00000000000000..c57aaa6ce51780 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessagePage.java @@ -0,0 +1,22 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.Collection; +import java.util.Map; + +public class KafkaMessagePage { + private final Map nextOffsets; + private final Collection messages; + + public KafkaMessagePage(Map nextOffsets, Collection messages) { + this.nextOffsets = nextOffsets; + this.messages = messages; + } + + public Map getNextOffsets() { + return nextOffsets; + } + + public Collection getMessages() { + return messages; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java index b678b50afc3444..ab5595d7a84888 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java @@ -5,15 +5,17 @@ public class KafkaTopic { private String topicId; private int partitionsCount; private boolean internal; + private long nmsg = 0; public KafkaTopic() { } - public KafkaTopic(String name, String topicId, int partitionsCount, boolean internal) { + public KafkaTopic(String name, String topicId, int partitionsCount, boolean internal, long nmsg) { this.name = name; this.topicId = topicId; this.partitionsCount = partitionsCount; this.internal = internal; + this.nmsg = nmsg; } public String getName() { @@ -32,6 +34,10 @@ public boolean isInternal() { return internal; } + public long getNmsg() { + return nmsg; + } + public String toString() { StringBuilder sb = new StringBuilder(name); sb.append(" : ").append(topicId); diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/util/ConsumerFactory.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/util/ConsumerFactory.java new file mode 100644 index 00000000000000..be2c1405308606 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/util/ConsumerFactory.java @@ -0,0 +1,37 @@ +package io.quarkus.kafka.client.runtime.ui.util; + +import java.util.*; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.utils.Bytes; + +public class ConsumerFactory { + + public static Consumer createConsumer(String topicName, Integer requestedPartition, + Map commonConfig) { + return createConsumer(List.of(new TopicPartition(topicName, requestedPartition)), commonConfig); + } + + // We must create a new instance per request, as we might have multiple windows open, each with different pagination, filter and thus different cursor. + public static Consumer createConsumer(Collection requestedPartitions, + Map commonConfig) { + Map config = new HashMap<>(commonConfig); + //TODO: make generic? + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + + config.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID()); + + // For pagination, we require manual management of offset pointer. + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + var consumer = new KafkaConsumer(config); + consumer.assign(requestedPartitions); + return consumer; + } + +} diff --git a/extensions/kafka-client/ui/src/main/webapp/config.js b/extensions/kafka-client/ui/src/main/webapp/config.js new file mode 100644 index 00000000000000..1c141c4c1e97c7 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/config.js @@ -0,0 +1,4 @@ +export const api = '/kafka-admin'; +export const logo = 'quarkus_icon_rgb_reverse.svg'; +export const faviconLogo = 'favicon.ico'; +export const ui = ''; \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/index.html b/extensions/kafka-client/ui/src/main/webapp/index.html index 975d965858710f..181056ca3ead08 100644 --- a/extensions/kafka-client/ui/src/main/webapp/index.html +++ b/extensions/kafka-client/ui/src/main/webapp/index.html @@ -35,6 +35,86 @@ min-height: 90vh; } + .link { + background: none; + border: none; + } + + .top-margin { + margin-top: 1em; + } + + .left-margin { + margin-left: 1em; + } + + .left-padding { + padding-left: 1em; + } + + .shown { + display: flex; + height: auto; + min-width: 100%; + } + + .text-shown { + display: inline; + } + + .hidden { + display: none + } + + .nav-item:hover > .nav-row > a { + background-color: #005fff; + color: #e9ecef; + } + + .nav-item:hover > .nav-row > i { + background-color: #005fff; + color: #e9ecef; + } + + #navbar-list > .nav-item:hover { + background-color: #005fff; + color: #e9ecef; + } + + .table-hover:hover { + cursor: pointer; + } + + .multiselect-container > li > a > label { + padding-left: 15px !important; + } + + .page { + min-height: calc(100vh - 135px); + } + + .table-hover:hover { + cursor: pointer; + } + + .pointer { + cursor: pointer; + } + + .no-hover { + background-color: white; + cursor: default; + } + + .no-hover:hover { + background-color: white !important; + cursor: default; + } + + .icon-rotated { + transform: rotate(90deg); + } + .navbar-brand img { border-right: 1px solid darkgrey; padding-right: 10px; @@ -50,6 +130,57 @@ padding: 0px; } + .float-plus-btn { + position: fixed; + bottom: 60px; + right: 60px; + border-radius: 100%; + height: 50px; + width: 50px; + } + + .breadcrumb-item::before { + float: left; + padding-right: 0.5rem; + color: #007bff; + content: "〉"; + } + + .breadcrumb-item + .breadcrumb-item::before { + float: left; + padding-right: 0.5rem; + color: #007bff; + content: "〉"; + } + + .breadcrumb { + background-color: #343a40; + margin-bottom: 0; + padding: 0 0 0 5px; + } + + .bi-trash-fill:hover { + color: #007bff; + } + + .collapse-content { + max-width: 1200px; + } + + .thead-multiselect { + background-color: #343a40; + color: white; + border: 0px; + font-weight: bold; + } + + .thead-text { + color: white; + } + + #msg-table-holder { + min-width: 100%; + } @@ -64,6 +195,194 @@
+ + + + -
- Schema registry is not implemeted yet.
-
@@ -173,30 +489,6 @@
-
-
-
- Kafka cluster id: 
- Controller node (broker): 
- ACL operations: 
-
-
-

Access Control Lists

-
- - - - - - - - - - - -
OperationPrinicipalPermissionsPattern
-
-
@@ -220,11 +512,19 @@

Cluster nodes

+
+ + + diff --git a/extensions/kafka-client/ui/src/main/webapp/kafka_ui.js b/extensions/kafka-client/ui/src/main/webapp/kafka_ui.js new file mode 100644 index 00000000000000..c07fdfd3782174 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/kafka_ui.js @@ -0,0 +1,11 @@ +import Navigator from './pages/navigator.js' +import {setLogo} from "./util/logo.js"; + +const navigator = new Navigator(); +$(document).ready( + () => { + setLogo(); + navigator.navigateToDefaultPage(); + } +); + diff --git a/extensions/kafka-client/ui/src/main/webapp/pages/consumerGroupDetailsPage.js b/extensions/kafka-client/ui/src/main/webapp/pages/consumerGroupDetailsPage.js new file mode 100644 index 00000000000000..cfd39d4d8be2d4 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/pages/consumerGroupDetailsPage.js @@ -0,0 +1,86 @@ +import {CollapseRow, createTableHead, createTableItem, createTableItemHtml} from "../util/contentManagement.js"; + +export default class ConsumerGroupDetailsPage { + constructor(containerId) { + this.containerId = containerId; + Object.getOwnPropertyNames(ConsumerGroupDetailsPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + open(params) { + const membersData = params[1]; + let consumerGroupsTable = $('#consumer-group-details-table tbody'); + consumerGroupsTable.empty(); + for (let i = 0; i < membersData.length; i++) { + const d = membersData[i]; + const groupId = "group-" + window.crypto.randomUUID(); + + let tableRow = $(""); + let collapseRow; + if (d.partitions.length > 0) { + collapseRow = new CollapseRow(groupId); + tableRow.append(createTableItemHtml(collapseRow.arrow)); + } else { + tableRow.append(createTableItem("")); + } + + const memberId = $("") + .text(d.clientId); + const id = d.memberId.substring(d.clientId.length); + const text = $("

") + .append(memberId) + .append(id); + tableRow.append(createTableItemHtml(text)); + tableRow.append(createTableItem(d.host)); + tableRow.append(createTableItem("" + new Set(d.partitions.map(x => x.partition)).size)); + tableRow.append(createTableItem("" + d.partitions.map(x => x.lag).reduce((l, r) => l + r, 0))); + consumerGroupsTable.append(tableRow); + + if (d.partitions.length > 0) { + const content = this.createConsumerGroupCollapseInfo(d); + tableRow + .addClass("pointer") + .click(collapseRow.collapse); + consumerGroupsTable.append( + collapseRow + .getCollapseContent(tableRow.children().length, content) + .addClass("no-hover") + ); + } + } + } + + createConsumerGroupCollapseInfo(dataItem) { + const collapseContent = $("") + .addClass("table") + .addClass("table-sm") + .addClass("no-hover"); + + const headers = $("") + .addClass("no-hover") + .append(createTableHead("Topic")) + .append(createTableHead("Partition")) + .append(createTableHead("Lag")); + const head = $("") + .append(headers); + + const body = $(""); + for (let partition of dataItem.partitions) { + const row = $("") + .addClass("no-hover"); + row.append(createTableItemHtml(partition.topic)) + row.append(createTableItemHtml(partition.partition)) + row.append(createTableItemHtml(partition.lag)) + body.append(row); + } + + collapseContent.append(head); + collapseContent.append(body); + + return collapseContent; + } + +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/pages/consumerGroupPage.js b/extensions/kafka-client/ui/src/main/webapp/pages/consumerGroupPage.js new file mode 100644 index 00000000000000..4a8500fab0c0e0 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/pages/consumerGroupPage.js @@ -0,0 +1,48 @@ +import {createTableItem} from "../util/contentManagement.js"; +import {doPost, errorPopUp} from "../web/web.js"; +import {pages} from "./navigator.js"; +import {toggleSpinner} from "../util/spinner.js"; + +export default class ConsumerGroupPage { + constructor(navigator, containerId) { + this.containerId = containerId; + this.navigator = navigator; + Object.getOwnPropertyNames(ConsumerGroupPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + open() { + toggleSpinner(this.containerId); + const req = { + action: "getInfo", key: "0", value: "0" + }; + doPost(req, (data) => { + this.updateConsumerGroups(data.consumerGroups); + toggleSpinner(this.containerId); + }, data => { + errorPopUp("Error getting Kafka info: ", data); + toggleSpinner(this.containerId); + }); + } + + updateConsumerGroups(data) { + let consumerGroupsTable = $('#consumer-groups-table tbody'); + consumerGroupsTable.empty(); + for (let i = 0; i < data.length; i++) { + const d = data[i]; + let tableRow = $(""); + tableRow.append(createTableItem(d.state)); + tableRow.append(createTableItem(d.name)); + tableRow.append(createTableItem(d.coordinatorId)); + tableRow.append(createTableItem(d.protocol)); + tableRow.append(createTableItem(d.members.length)); + tableRow.append(createTableItem(d.lag)); + const self = this; + tableRow.click(() => self.navigator.navigateTo(pages.CONSUMER_GROUPS_DETAILS, [d.name, d.members])); + consumerGroupsTable.append(tableRow); + } + } +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/pages/messagesPage.js b/extensions/kafka-client/ui/src/main/webapp/pages/messagesPage.js new file mode 100644 index 00000000000000..f009847be3826e --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/pages/messagesPage.js @@ -0,0 +1,449 @@ +import {doPost, errorPopUp} from "../web/web.js"; +import timestampToFormattedString from "../util/datetimeUtil.js"; +import {CollapseRow, createTableItem, createTableItemHtml} from "../util/contentManagement.js"; +import {toggleSpinner} from "../util/spinner.js"; + +const MODAL_KEY_TAB = "header-key-tab-pane"; +const PAGE_SIZE = 20; +const NEW_FIRST = "NEW_FIRST"; +const OLD_FIRST = "OLD_FIRST"; +const MESSAGES_SPINNER = "message-load-spinner"; +const MESSAGES_TABLE_BODY = "msg-table-body"; +const MESSAGES_TABLE_HOLDER = "msg-table-holder"; + +export default class MessagesPage { + constructor(containerId) { + this.containerId = containerId; + this.registerButtonHandlers(); + Object.getOwnPropertyNames(MessagesPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + registerButtonHandlers() { + $("#open-create-msg-modal-btn").click(() => { + $('#create-msg-modal').modal('show'); + this.setActiveTab(MODAL_KEY_TAB); + }); + + $('#send-msg-btn').click(this.createMessage.bind(this)); + + $('.close-modal-btn').click(() => { + $('.modal').modal('hide'); + this.setActiveTab(MODAL_KEY_TAB); + }); + + $('#msg-page-partition-select').multiselect({ + buttonClass: 'thead-multiselect', + includeSelectAllOption: true, + filterPlaceholder: 'Partitions', + selectAllText: 'Select All', + nonSelectedText: 'Partitions', + buttonText: function () { + return 'Partitions'; + } + }); + + $("#timestamp-sort-header").click(() => { + this.toggleSorting(); + window.currentContext.currentPage = 1; + this.loadMessages(); + }); + + $("#msg-page-partition-select").change(() => { + window.currentContext.currentPage = 1; + this.loadMessages(); + }); + + $(".previous").click(() => { + if (window.currentContext.currentPage === 1) return; + window.currentContext.currentPage = window.currentContext.currentPage - 1; + this.loadMessages(); + }) + + $(".next").click(() => { + if (window.currentContext.currentPage === this.getMaxPageNumber()) return; + window.currentContext.currentPage = window.currentContext.currentPage + 1; + this.loadMessages(); + }) + + $("#reload-msg-btn").click(() => { + currentContext.pagesCache = new Map(); + this.loadMessages(); + }); + } + + toggleSorting() { + if (currentContext.currentSorting === NEW_FIRST) { + currentContext.currentSorting = OLD_FIRST; + $("#timestamp-sort-icon") + .removeClass("bi-chevron-double-down") + .addClass("bi-chevron-double-up"); + } else { + currentContext.currentSorting = NEW_FIRST; + $("#timestamp-sort-icon") + .addClass("bi-chevron-double-down") + .removeClass("bi-chevron-double-up"); + } + } + + loadMessages() { + toggleSpinner(MESSAGES_TABLE_HOLDER, MESSAGES_SPINNER); + this.getPage(currentContext.currentPage, this.onMessagesLoaded, this.onMessagesFailed); + this.redrawPageNav(); + } + + open(params) { + toggleSpinner(MESSAGES_TABLE_HOLDER, MESSAGES_SPINNER); + const topicName = params[0]; + window.currentContext = { + topicName: topicName, + currentPage: 1, //always start with first page + pagesCache: new Map(), + currentSorting: NEW_FIRST + }; + + this.clearMessageTable(); + + new Promise((resolve, reject) => { + this.requestPartitions(topicName, resolve, reject); + }).then((data) => { + this.onPartitionsLoaded(data); + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + this.loadMaxPageNumber(); + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + this.getPage(currentContext.currentPage, this.onMessagesLoaded, this.onMessagesFailed); + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }) + .catch(() => errorPopUp("Failed loading page.")); + } + + // Key format: ORDER-partition1-partition2-...-partitionN-pageNumber. Like: NEW_FIRST-0-1-17 + generateCacheKey(pageNumber) { + const order = this.getOrder(); + const partitions = this.getPartitions(); + const partitionsKeyPart = partitions.reduce((partialKey, str) => partialKey + "-" + str, 0); + + return order + partitionsKeyPart + "-" + pageNumber; + } + + requestPartitions(topicName, onPartitionsLoaded, onPartitionsFailed) { + const rq = { + action: "getPartitions", topicName: topicName + } + + doPost(rq, onPartitionsLoaded, onPartitionsFailed); + } + + onPartitionsLoaded(data) { + let msgModalPartitionSelect = $('#msg-modal-partition-select'); + let msgPagePartitionSelect = $('#msg-page-partition-select'); + msgModalPartitionSelect.empty(); + msgPagePartitionSelect.empty(); + + msgModalPartitionSelect.append($(""); + const groupId = "group-" + window.crypto.randomUUID(); + const collapseRow = new CollapseRow(groupId); + tableRow.append(createTableItemHtml(collapseRow.arrow)); + + tableRow.append(createTableItem(messages[i].offset)); + tableRow.append(createTableItem(messages[i].partition)); + tableRow.append(createTableItem(timestampToFormattedString(messages[i].timestamp))); + tableRow.append(createTableItem(messages[i].key)); + + const value = messages[i].value; + const maxMsgLength = 75; + if (value.length < maxMsgLength) { + tableRow.append(createTableItem(value)); + } else { + tableRow.append(createTableItem(value.slice(0, maxMsgLength) + "...")); + } + tableRow.append(createTableItem()); + tableRow + .addClass("pointer") + .click(collapseRow.collapse); + msgTableBody.append(tableRow); + msgTableBody.append(collapseRow.getCollapseContent(tableRow.children().length, this.createMessageCollapseItem(value))); + } + + currentContext.lastOffset = data.partitionOffset; + toggleSpinner(MESSAGES_TABLE_HOLDER, MESSAGES_SPINNER); + } + + createMessageCollapseItem(fullMessage) { + return $("
") + .text(fullMessage); + } + + toggleContent() { + return (event) => { + const textBlock = $(event.target); + const dots = textBlock.find(".dots"); + const hiddenText = textBlock.find(".hidden-text"); + + if (dots.hasClass("hidden")) { + dots.removeClass("hidden"); + dots.addClass("text-shown"); + hiddenText.removeClass("text-shown"); + hiddenText.addClass("hidden"); + } else { + dots.removeClass("text-shown"); + dots.addClass("hidden"); + hiddenText.removeClass("hidden"); + hiddenText.addClass("text-shown"); + } + }; + } + + onMessagesFailed(data, errorType, error) { + console.error("Error getting topic messages"); + } + + requestCreateMessage() { + const topicName = currentContext.topicName; + let partition = $('#msg-modal-partition-select option:selected').val(); + if (partition === 'any') partition = null; + + let valueTextarea = $('#msg-value-textarea'); + let keyTextarea = $('#msg-key-textarea'); + const rq = { + action: "createMessage", + topic: topicName, + partition: partition, + value: valueTextarea.val(), + key: keyTextarea.val() + }; + + // TODO: print out partitions count on topics page + doPost(rq, data => { + currentContext.pagesCache = new Map(); + new Promise(this.loadMaxPageNumber) + .then(this.loadMessages) + .catch(() => errorPopUp("Failed")); + }, (data, errorType, error) => { + errorPopUp("Failed to reload messages."); + }); + } + + setActiveTab(tab) { + $('.nav-tabs button[href="#' + tab + '"]').click(); + }; + + createMessage() { + this.requestCreateMessage(); + + // Clean inputs for future reuse of modal. + $('#create-msg-modal').modal('hide'); + $('#msg-value-textarea').val(""); + $('#msg-key-textarea').val(""); + $('#msg-modal-partition-select').val("any"); + $('#msg-modal-type-select').val("text"); + + $('body').removeClass('modal-open'); + $('.modal-backdrop').remove(); + + this.setActiveTab(MODAL_KEY_TAB); + } + + clearMessageTable() { + $('#msg-table-body').empty(); + } + + redrawPageNav() { + //TODO: add GOTO page input + const previous = $(".previous"); + const next = $(".next"); + + previous.removeClass("disabled"); + next.removeClass("disabled"); + + const maxPageNumber = this.getMaxPageNumber(); + const currentPage = currentContext.currentPage; + let pages = [currentPage]; + + if (currentPage > 1) { + pages.unshift(currentPage - 1); + } + if (currentPage < maxPageNumber) { + pages.push(currentPage + 1); + } + + if (currentPage === 1) { + previous.addClass("disabled"); + if (maxPageNumber > 2) { + pages.push(currentPage + 2); + } + } + if (currentPage === maxPageNumber) { + next.addClass("disabled"); + if (maxPageNumber > 2) { + pages.unshift(currentPage - 2); + } + } + + const pagination = $("#msg-pagination"); + + // Remove all page children numbers. + while (pagination.children().length !== 2) { + pagination.children()[1].remove(); + } + + for (const p of pages) { + let a = $("") + .text("" + p) + .addClass("page-link"); + let li = $("
  • ") + .addClass("page-item") + .click(() => { + toggleSpinner(MESSAGES_TABLE_HOLDER, MESSAGES_SPINNER); + currentContext.currentPage = p; + this.getPage(p, this.onMessagesLoaded, this.onMessagesFailed); + this.redrawPageNav(); + }); + + if (p === currentPage) { + li.addClass("active"); + } + li.append(a); + + const lastPosition = pagination.children().length - 1; + li.insertBefore(".next"); + } + } + + requestOffset(topicName, order, onOffsetLoaded, onOffsetFailed, partitions) { + const req = { + action: "getOffset", + topicName: topicName, + order: order, + requestedPartitions: partitions === undefined ? this.getPartitions() : partitions + }; + doPost(req, onOffsetLoaded, onOffsetFailed); + } + + // TODO: add possibility to hide panel on the left + loadMaxPageNumber() { + const partitions = this.getPartitions(); + this.requestOffset( + currentContext.topicName, + NEW_FIRST, + (data) => { + currentContext.partitionOffset = new Map( + Object.entries(data).map(x => [parseInt(x[0]), x[1]]) + ); + this.redrawPageNav(); + }, + (data, errorType, error) => { + console.error("Error getting max page number."); + }, + partitions + ); + } + + getMaxPageNumber() { + const partitions = this.getPartitions(); + const totalElements = partitions.map(x => { + const a = currentContext.partitionOffset.get(x) + return a; + }) + .reduce((partialSum, a) => partialSum + a, 0); + return Math.max(Math.ceil(totalElements / PAGE_SIZE), 1); + } + + getOrder() { + return currentContext.currentSorting; + } + +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/pages/navigator.js b/extensions/kafka-client/ui/src/main/webapp/pages/navigator.js new file mode 100644 index 00000000000000..cd5e66615a4aa7 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/pages/navigator.js @@ -0,0 +1,175 @@ +import MessagesPage from "./messagesPage.js"; +import TopicsPage from "./topicsPage.js"; +import ConsumerGroupPage from "./consumerGroupPage.js"; +import ConsumerGroupDetailsPage from "./consumerGroupDetailsPage.js"; +import AccessControlListPage from "./accessControlListPage.js"; +import NodesPage from "./nodesPage.js"; +import {createIcon} from "../util/contentManagement.js"; + +export const pages = { + TOPICS: "topics-page", + SCHEMA: "schema-page", + CONSUMER_GROUPS: "consumer-groups-page", + CONSUMER_GROUPS_DETAILS: "consumer-groups-details-page", + ACCESS_CONTROL_LIST: "access-control-list-page", + NODES: "nodes-page", + TOPIC_MESSAGES: "topic-messages-page", + DEFAULT: "topics-page" +} + +export default class Navigator { + constructor() { + this.registerNavbar(); + } + + allPages = { + [pages.TOPICS]: { + header: "Topics", + showInNavbar: true, + instance: new TopicsPage(this, pages.TOPICS), + icon: "bi-collection" + }, + [pages.SCHEMA]: { + header: "Schema registry", + showInNavbar: true, + icon: "bi-file-code" + }, + [pages.CONSUMER_GROUPS]: { + header: "Consumer groups", + showInNavbar: true, + instance: new ConsumerGroupPage(this, pages.CONSUMER_GROUPS), + icon: "bi-inboxes" + }, + [pages.ACCESS_CONTROL_LIST]: { + header: "Access control list", + showInNavbar: true, + instance: new AccessControlListPage(pages.ACCESS_CONTROL_LIST), + icon: "bi-shield-lock" + }, + [pages.NODES]: { + header: "Nodes", + showInNavbar: true, + instance: new NodesPage(pages.NODES), + icon: "bi-diagram-3" + }, + [pages.TOPIC_MESSAGES]: { + header: "Messages", + showInNavbar: false, + instance: new MessagesPage(pages.TOPIC_MESSAGES), + parent: pages.TOPICS + }, + [pages.CONSUMER_GROUPS_DETAILS]: { + header: "Consumer group details", + showInNavbar: false, + instance: new ConsumerGroupDetailsPage(pages.CONSUMER_GROUPS_DETAILS), + parent: pages.CONSUMER_GROUPS + } + }; + + registerNavbar() { + const keys = Object.keys(this.allPages); + const navbar = $("#navbar-list"); + navbar.empty(); + + for (let i = 0; i < keys.length; i++) { + const key = keys[i]; + const value = this.allPages[key]; + if (!value.showInNavbar) continue; + const navItem = $("
  • ") + .addClass("nav-item") + .addClass("left-padding") + .addClass("pointer"); + + const navHolder = $("
    ") + .addClass("d-flex") + .addClass("left-margin") + .addClass("nav-row") + .click(() => this.navigateTo(key)); + + const icon = createIcon(value.icon) + .addClass("align-self-center"); + const navLink = $("", { + text: value.header, + href: "#" + }) + .addClass("nav-link") + .addClass("active") + .addClass("link"); + navHolder.append(icon); + navHolder.append(navLink); + navItem.append(navHolder); + navbar.append(navItem); + } + } + + navigateTo(requestedPage, params) { + const keys = Object.keys(this.allPages); + for (let i = 0; i < keys.length; i++) { + const elementName = keys[i]; + const d = $("#" + elementName); + if (d !== null) { + if (elementName !== requestedPage) { + d.removeClass("shown") + .addClass("hidden"); + } else { + d.removeClass("hidden") + .addClass("shown"); + this.open(requestedPage, params); + } + } else { + console.error("Can not find page div: ", keys[i]); + } + } + + this.navigateBreadcrumb(requestedPage, params); + } + + navigateToDefaultPage() { + this.navigateTo(pages.DEFAULT); + } + + open(pageId, params) { + const value = this.allPages[pageId]; + value.instance.open(params); + } + + navigateBreadcrumb(page, params) { + const breadcrumb = $("#nav-breadcrumb"); + breadcrumb.empty(); + + let nextPage = this.allPages[page]; + let pageId = page; + + let i = 0; + while (nextPage !== undefined) { + let li; + // We only need to append possible params to the very first element. + if (i === 0) { + li = this.createBreadcrumbItem(nextPage.header, pageId, true, params); + } else { + li = this.createBreadcrumbItem(nextPage.header, pageId, false); + } + breadcrumb.prepend(li); + pageId = nextPage.parent; + nextPage = this.allPages[pageId]; + i++; + } + } + + createBreadcrumbItem(text, pageId, isActive, params) { + let breadcrumbText = text; + if (params !== undefined && params.length > 0 && (params[0] !== null && params[0] !== undefined)) { + breadcrumbText = text + " (" + params[0] + ")"; + } + const a = $("", {href: "#", text: breadcrumbText}) + .click(() => this.navigateTo(pageId, params)); + if (isActive) { + a.addClass("active"); + } + + const li = $("
  • ") + .addClass("breadcrumb-item"); + li.append(a); + return li; + } +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/pages/topicsPage.js b/extensions/kafka-client/ui/src/main/webapp/pages/topicsPage.js new file mode 100644 index 00000000000000..28156717578c8b --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/pages/topicsPage.js @@ -0,0 +1,188 @@ +import {doPost, errorPopUp} from "../web/web.js"; +import {createIcon, createTableItem, createTableItemHtml, hideItem, showItem} from "../util/contentManagement.js"; +import {pages} from "./navigator.js"; + +export default class TopicsPage { + constructor(navigator, containerId) { + this.navigator = navigator; + this.containerId = containerId; + this.registerButtonHandlers(); + + // TODO: move to common function with comment + Object.getOwnPropertyNames(TopicsPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + open() { + window.currentContext = {}; + this.requestTopics(this.onTopicsLoaded, this.onTopicsFailed); + } + + registerButtonHandlers() { + + const topicNameInput = $("#topic-name-modal-input"); + $("#create-topic-btn").click(() => { + if (!this.validateTopicName(topicNameInput.val())) { + this.showErrorIfInvalid(topicNameInput.val(), this.validateTopicName, topicNameValidationErrorBox); + return; + } + + this.createTopic(this.onTopicsLoaded, this.onTopicsFailed); + $('#create-topic-modal').modal('hide'); + $('#topic-name-modal-input').val(""); + $('#partitions-modal-input').val(""); + $('#replications-modal-input').val(""); + }) + + $("#open-create-topic-modal-btn").click(() => { + this.loadNodesCount(); + $('#create-topic-modal').modal('show'); + }); + + $('.close-modal-btn').click(() => { + hideItem($(".modal")); + hideItem($("#topic-creation-validation-msg-box")); + hideItem($("#topic-name-validation-msg")); + hideItem($("#replication-validation-msg")); + }); + + $("#delete-topic-btn").click(() => { + const currentTopic = window.currentContext.topicName; + this.deleteTopic(currentTopic, this.deleteTopicRow, this.onTopicsFailed) + $("#delete-topic-modal").modal("hide"); + }); + + const topicNameValidationErrorBox = $("#topic-name-validation-msg"); + topicNameInput.keyup(() => this.showErrorIfInvalid(topicNameInput.val(), this.validateTopicName, topicNameValidationErrorBox)); + topicNameInput.change(() => this.showErrorIfInvalid(topicNameInput.val(), this.validateTopicName, topicNameValidationErrorBox)); + + const replicationInput = $("#replications-modal-input"); + replicationInput.keyup(() => { + const value = replicationInput.val(); + this.showErrorIfInvalid(value, this.validateReplicationFactor, $("#replication-validation-msg")); + }); + } + + loadNodesCount() { + const req = { + action: "getInfo" + }; + doPost(req, (data) => { + window.currentContext.nodesCount = data.clusterInfo.nodes.length; + }, data => { + errorPopUp("Could not obtain nodes count."); + }); + } + + showErrorIfInvalid(value, validationFunction, errBoxSelector) { + const valid = validationFunction(value); + if (!valid) { + showItem($("#topic-creation-validation-msg-box")); + showItem(errBoxSelector); + $("#create-topic-btn") + .addClass("disabled") + .attr("disabled", true); + } else { + hideItem(errBoxSelector); + const topicMsgValidationBoxChildren = $("#topic-creation-validation-msg-box span"); + const allChildrenHidden = topicMsgValidationBoxChildren + .filter((x) => !$(x).hasClass("hidden")) + .length > 0; + if (allChildrenHidden) { + hideItem($("#topic-creation-validation-msg-box")); + $("#create-topic-btn") + .removeClass("disabled") + .attr("disabled", false); + } + } + } + + validateTopicName(name) { + const legalChars = /^[a-zA-Z\d\.\_]+$/; + const maxNameLength = 255; + return legalChars.test(name) && name.length < maxNameLength; + } + + validateReplicationFactor(replicationFactor) { + return currentContext.nodesCount >= replicationFactor; + } + + requestTopics(onTopicsLoaded, onTopicsFailed) { + const req = { + action: "getTopics" + }; + doPost(req, onTopicsLoaded, onTopicsFailed); + } + + onTopicsLoaded(data) { + let tableBody = $('#topics-table tbody'); + tableBody.empty(); + + for (let i = 0; i < data.length; i++) { + let tableRow = $("
  • "); + let d = data[i]; + tableRow.append(createTableItem(d.name)); + tableRow.append(createTableItem(d.topicId)); + tableRow.append(createTableItem(d.partitionsCount)); + tableRow.append(createTableItem(("" + d.nmsg))); + + const deleteIcon = createIcon("bi-trash-fill"); + const deleteBtn = $("") + .addClass("btn") + .click((event) => { + window.currentContext.topicName = d.name; + $("#delete-topic-modal").modal("show"); + $("#delete-topic-name-span").text(d.name); + event.stopPropagation(); + }) + .append(deleteIcon); + + + tableRow.click(() => { + self.navigator.navigateTo(pages.TOPIC_MESSAGES, [d.name]); + }); + const controlHolder = $("
    ") + .append(deleteBtn); + tableRow.append(createTableItemHtml(controlHolder)); + + const self = this; + + tableBody.append(tableRow); + } + } + + onTopicsFailed(data) { + errorPopUp("Error getting topics: ", data); + } + + createTopic(onTopicsLoaded, onTopicsFailed) { + const topicName = $("#topic-name-modal-input").val(); + const partitions = $("#partitions-modal-input").val(); + const replications = $("#replications-modal-input").val(); + + const req = { + action: "createTopic", + topicName: topicName, + partitions: partitions, + replications: replications + }; + doPost(req, () => this.requestTopics(this.onTopicsLoaded, this.onTopicsFailed), onTopicsFailed); + } + + // TODO: add pagination here + deleteTopic(topicName, onTopicsDeleted, onTopicsFailed) { + const req = { + action: "deleteTopic", + key: topicName + }; + doPost(req, onTopicsDeleted, onTopicsFailed); + } + + deleteTopicRow(data) { + const topicName = window.currentContext.topicName; + $("#topics-table > tbody > tr > td:contains('" + topicName + "')").parent().remove() + } +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/util/contentManagement.js b/extensions/kafka-client/ui/src/main/webapp/util/contentManagement.js index 50df67b1ccef4f..d9f87034ab8dda 100644 --- a/extensions/kafka-client/ui/src/main/webapp/util/contentManagement.js +++ b/extensions/kafka-client/ui/src/main/webapp/util/contentManagement.js @@ -4,12 +4,58 @@ export function createTableItem(text) { }); } +export function createTableItemHtml(html) { + return $("
    ").append(createTableItemHtml( + collapseContent + .addClass("collapse-content")) + .attr("colspan", tableWidth)) + .attr("id", this.collapseId) + .addClass("collapse"); + } + + collapse() { + $("#" + this.collapseId).toggle(); + if (this.arrow.hasClass("icon-rotated")) { + this.arrow.removeClass("icon-rotated"); + } else { + this.arrow.addClass("icon-rotated"); + } + } +} + export function showItem(selector){ selector.addClass("shown") .removeClass("hidden"); diff --git a/extensions/kafka-client/ui/src/main/webapp/util/datetimeUtil.js b/extensions/kafka-client/ui/src/main/webapp/util/datetimeUtil.js new file mode 100644 index 00000000000000..384d693bde8619 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/util/datetimeUtil.js @@ -0,0 +1,17 @@ +function addTrailingZero(data) { + if (data < 10) { + return "0" + data; + } + return data; +} + +export default function timestampToFormattedString(UNIX_timestamp) { + const a = new Date(UNIX_timestamp); + const year = a.getFullYear(); + const month = addTrailingZero(a.getMonth()); + const date = addTrailingZero(a.getDate()); + const hour = addTrailingZero(a.getHours()); + const min = addTrailingZero(a.getMinutes()); + const sec = addTrailingZero(a.getSeconds()); + return date + '/' + month + '/' + year + ' ' + hour + ':' + min + ':' + sec; +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/util/logo.js b/extensions/kafka-client/ui/src/main/webapp/util/logo.js new file mode 100644 index 00000000000000..2f40efa91fb37f --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/util/logo.js @@ -0,0 +1,8 @@ +import {faviconLogo, logo} from "../config.js" + +export function setLogo(){ + $("#navbar-logo") + .attr("src", logo); + $("#favicon") + .attr("href", faviconLogo); +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/util/spinner.js b/extensions/kafka-client/ui/src/main/webapp/util/spinner.js new file mode 100644 index 00000000000000..a5ca80594e44ff --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/util/spinner.js @@ -0,0 +1,21 @@ +export function toggleSpinner(containerId, spinnerContainerId) { + const spinnerId = spinnerContainerId === undefined ? "#page-load-spinner" : "#" + spinnerContainerId; + const toggleContainerId = "#" + containerId; + let first; + let second; + + if ($(spinnerId).hasClass("shown")) { + first = toggleContainerId; + second = spinnerId; + } else { + second = toggleContainerId; + first = spinnerId; + } + + $(first) + .removeClass("hidden") + .addClass("shown"); + $(second) + .addClass("hidden") + .removeClass("shown"); +} \ No newline at end of file
    ").append(html); +} + +export function createTableHead(title) { + return $("") + .attr("scope", "col") + .text(title); +} + export function createIcon(iconClass) { return $("") .addClass("bi") .addClass(iconClass); } +export class CollapseRow { + constructor(collapseId) { + this.collapseId = collapseId; + const chevronIcon = createIcon("bi-chevron-right") + .addClass("rotate-icon"); + this.arrow = $("
    ") + .addClass("d-flex") + .addClass("justify-content-center") + .append(chevronIcon); + + Object.getOwnPropertyNames(CollapseRow.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + getCollapseContent(tableWidth, collapseContent) { + return $("