diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-ui/qwc-kafka-add-topic.js b/extensions/kafka-client/deployment/src/main/resources/dev-ui/qwc-kafka-add-topic.js index ad8ca2104ee67..c26c0d9cb26ea 100644 --- a/extensions/kafka-client/deployment/src/main/resources/dev-ui/qwc-kafka-add-topic.js +++ b/extensions/kafka-client/deployment/src/main/resources/dev-ui/qwc-kafka-add-topic.js @@ -59,6 +59,13 @@ export class QwcKafkaAddTopic extends LitElement { min="0" max="99"> + + ${this._renderButtons()}`; } @@ -70,10 +77,11 @@ export class QwcKafkaAddTopic extends LitElement { } _reset(){ - this._newTopic = new Object(); + this._newTopic = {}; this._newTopic.name = ''; this._newTopic.partitions = 1; this._newTopic.replications = 1; + this._newTopic.configs = undefined; } _cancel(){ @@ -89,11 +97,11 @@ export class QwcKafkaAddTopic extends LitElement { _submit(){ if(this._newTopic.name.trim() !== ''){ - this.jsonRpc.createTopic({ topicName: this._newTopic.name, partitions: parseInt(this._newTopic.partitions), - replications: parseInt(this._newTopic.replications) + replications: parseInt(this._newTopic.replications), + configs: this._newTopic.configs }).then(jsonRpcResponse => { this._reset(); const success = new CustomEvent("kafka-topic-added-success", { @@ -119,6 +127,17 @@ export class QwcKafkaAddTopic extends LitElement { _replicationsChanged(e){ this._newTopic.replications = e.detail.value; } + + _configsChanged(e){ + this._newTopic.configs = Object.fromEntries(e.detail.value.split(',') + .reduce((configs, item) => { + const split = item.trim().split('='); + if (split.length > 1) { + configs.set(split[0], split[1]); + } + return configs; + }, new Map())); + } } customElements.define('qwc-kafka-add-topic', QwcKafkaAddTopic); \ No newline at end of file diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java index 0ced0504abcda..8f8486630e18c 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java @@ -70,23 +70,23 @@ public Collection getConsumerGroups() throws Interrupt .values(); } - public boolean deleteTopic(String name) { + public boolean deleteTopic(final String name) { Collection topics = new ArrayList<>(); topics.add(name); DeleteTopicsResult dtr = client.deleteTopics(topics); return dtr.topicNameValues() != null; } - public boolean createTopic(KafkaCreateTopicRequest kafkaCreateTopicRq) { + public boolean createTopic(final KafkaCreateTopicRequest kafkaCreateTopicRq) { var partitions = Optional.ofNullable(kafkaCreateTopicRq.getPartitions()).orElse(1); var replications = Optional.ofNullable(kafkaCreateTopicRq.getReplications()).orElse((short) 1); var newTopic = new NewTopic(kafkaCreateTopicRq.getTopicName(), partitions, replications); - + newTopic.configs(Optional.ofNullable(kafkaCreateTopicRq.getConfigs()).orElse(Map.of())); CreateTopicsResult ctr = client.createTopics(List.of(newTopic)); return ctr.values() != null; } - public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { + public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId) { return client.listConsumerGroupOffsets(groupId); } @@ -96,7 +96,7 @@ public Collection getAclInfo() throws InterruptedException, Executio return client.describeAcls(filter, options).values().get(); } - public Map describeTopics(Collection topicNames) + public Map describeTopics(final Collection topicNames) throws InterruptedException, ExecutionException { return client.describeTopics(topicNames) .allTopicNames() diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/KafkaJsonRPCService.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/KafkaJsonRPCService.java index c9d37cbb001e4..b5fe4ebdd3fcf 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/KafkaJsonRPCService.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/KafkaJsonRPCService.java @@ -30,10 +30,12 @@ public List getTopics() throws InterruptedException, ExecutionExcept return kafkaUiUtils.getTopics(); } - public List createTopic(String topicName, int partitions, int replications) + public List createTopic(final String topicName, final int partitions, final int replications, + Map configs) throws InterruptedException, ExecutionException { - KafkaCreateTopicRequest createTopicRequest = new KafkaCreateTopicRequest(topicName, partitions, (short) replications); + KafkaCreateTopicRequest createTopicRequest = new KafkaCreateTopicRequest(topicName, partitions, (short) replications, + configs); boolean created = kafkaAdminClient.createTopic(createTopicRequest); if (created) { return kafkaUiUtils.getTopics(); @@ -41,7 +43,7 @@ public List createTopic(String topicName, int partitions, int replic throw new RuntimeException("Topic [" + topicName + "] not created"); } - public List deleteTopic(String topicName) throws InterruptedException, ExecutionException { + public List deleteTopic(final String topicName) throws InterruptedException, ExecutionException { boolean deleted = kafkaAdminClient.deleteTopic(topicName); if (deleted) { return kafkaUiUtils.getTopics(); @@ -49,7 +51,7 @@ public List deleteTopic(String topicName) throws InterruptedExceptio throw new RuntimeException("Topic [" + topicName + "] not deleted"); } - public KafkaMessagePage topicMessages(String topicName) throws ExecutionException, InterruptedException { + public KafkaMessagePage topicMessages(final String topicName) throws ExecutionException, InterruptedException { List partitions = getPartitions(topicName); KafkaOffsetRequest offsetRequest = new KafkaOffsetRequest(topicName, partitions, Order.NEW_FIRST); Map offset = kafkaUiUtils.getOffset(offsetRequest); @@ -71,7 +73,7 @@ public KafkaMessagePage createMessage(String topicName, Integer partition, Strin return topicMessages(topicName); } - public List getPartitions(String topicName) throws ExecutionException, InterruptedException { + public List getPartitions(final String topicName) throws ExecutionException, InterruptedException { return new ArrayList<>(kafkaUiUtils.partitions(topicName)); } diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/model/request/KafkaCreateTopicRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/model/request/KafkaCreateTopicRequest.java index b099bb06ca3e9..883a6ba1ecaa4 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/model/request/KafkaCreateTopicRequest.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/devui/model/request/KafkaCreateTopicRequest.java @@ -1,17 +1,22 @@ package io.quarkus.kafka.client.runtime.devui.model.request; +import java.util.Map; + public class KafkaCreateTopicRequest { private String topicName; private Integer partitions; private Short replications; + private Map configs; public KafkaCreateTopicRequest() { } - public KafkaCreateTopicRequest(String topicName, Integer partitions, Short replications) { + public KafkaCreateTopicRequest(final String topicName, final Integer partitions, final Short replications, + final Map configs) { this.topicName = topicName; this.partitions = partitions; this.replications = replications; + this.configs = configs; } public String getTopicName() { @@ -25,4 +30,9 @@ public Integer getPartitions() { public Short getReplications() { return replications; } + + public Map getConfigs() { + return configs; + } + }