diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 83a9594b3528c..d3902993aa765 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -1265,6 +1265,11 @@ quarkus-kafka-client-deployment ${project.version} + + io.quarkus + quarkus-kafka-client-ui + ${project.version} + io.quarkus quarkus-kafka-streams diff --git a/extensions/kafka-client/deployment/pom.xml b/extensions/kafka-client/deployment/pom.xml index c1312bd48e115..78ae0ae217423 100644 --- a/extensions/kafka-client/deployment/pom.xml +++ b/extensions/kafka-client/deployment/pom.xml @@ -44,6 +44,14 @@ io.quarkus quarkus-caffeine-deployment + + io.quarkus + quarkus-vertx-http-deployment + + + io.quarkus + quarkus-kafka-client-ui + org.testcontainers testcontainers diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java index b975b460e869b..ab52f4b2dc7a1 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java @@ -1,5 +1,6 @@ package io.quarkus.kafka.client.deployment; +import io.quarkus.runtime.annotations.ConfigDocSection; import io.quarkus.runtime.annotations.ConfigItem; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; @@ -28,4 +29,11 @@ public class KafkaBuildTimeConfig { */ @ConfigItem public KafkaDevServicesBuildTimeConfig devservices; + + /** + * Kafka UI configuration + */ + @ConfigItem + @ConfigDocSection + public KafkaBuildTimeUiConfig ui; } diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeUiConfig.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeUiConfig.java new file mode 100644 index 0000000000000..d8dfd5706a5d9 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeUiConfig.java @@ -0,0 +1,28 @@ +package io.quarkus.kafka.client.deployment; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigItem; + +@ConfigGroup +public class KafkaBuildTimeUiConfig { + + /** + * The path where Kafka UI is available. + * The value `/` is not allowed as it blocks the application from serving anything else. + * By default, this URL will be resolved as a path relative to `${quarkus.http.non-application-root-path}`. + */ + @ConfigItem(defaultValue = "kafka-ui") + public String rootPath; + /** + * Whether or not to enable Kafka Dev UI in non-development native mode. + */ + @ConfigItem(name = "handlerpath", defaultValue = "kafka-admin") + public String handlerRootPath; + /** + * Always include the UI. By default, this will only be included in dev and test. + * Setting this to true will also include the UI in Prod + */ + @ConfigItem(defaultValue = "false") + public boolean alwaysInclude; + +} diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 41fb178c3ea4a..1557b9430add2 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -1,14 +1,21 @@ package io.quarkus.kafka.client.deployment; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Scanner; import java.util.Set; import java.util.function.Consumer; import java.util.logging.Level; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.security.auth.spi.LoginModule; @@ -70,10 +77,13 @@ import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.HotDeploymentWatchedFileBuildItem; import io.quarkus.deployment.builditem.IndexDependencyBuildItem; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; import io.quarkus.deployment.builditem.LogCategoryBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem; +import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageSecurityProviderBuildItem; @@ -82,9 +92,11 @@ import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem; import io.quarkus.deployment.logging.LogCleanupFilterBuildItem; import io.quarkus.deployment.pkg.NativeConfig; -import io.quarkus.kafka.client.runtime.KafkaBindingConverter; -import io.quarkus.kafka.client.runtime.KafkaRecorder; +import io.quarkus.kafka.client.runtime.*; import io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer; +import io.quarkus.kafka.client.runtime.ui.KafkaTopicClient; +import io.quarkus.kafka.client.runtime.ui.KafkaUiRecorder; +import io.quarkus.kafka.client.runtime.ui.KafkaUiUtils; import io.quarkus.kafka.client.serialization.BufferDeserializer; import io.quarkus.kafka.client.serialization.BufferSerializer; import io.quarkus.kafka.client.serialization.JsonArrayDeserializer; @@ -95,7 +107,19 @@ import io.quarkus.kafka.client.serialization.JsonbSerializer; import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; import io.quarkus.kafka.client.serialization.ObjectMapperSerializer; +import io.quarkus.maven.dependency.GACT; +import io.quarkus.runtime.LaunchMode; +import io.quarkus.runtime.configuration.ConfigurationException; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; +import io.quarkus.vertx.http.deployment.BodyHandlerBuildItem; +import io.quarkus.vertx.http.deployment.HttpRootPathBuildItem; +import io.quarkus.vertx.http.deployment.NonApplicationRootPathBuildItem; +import io.quarkus.vertx.http.deployment.RouteBuildItem; +import io.quarkus.vertx.http.deployment.webjar.WebJarBuildItem; +import io.quarkus.vertx.http.deployment.webjar.WebJarResourcesFilter; +import io.quarkus.vertx.http.deployment.webjar.WebJarResultsBuildItem; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; public class KafkaProcessor { @@ -144,6 +168,25 @@ public class KafkaProcessor { static final DotName PARTITION_ASSIGNER = DotName .createSimple("org.apache.kafka.clients.consumer.internals.PartitionAssignor"); + // For the UI + private static final GACT KAFKA_UI_WEBJAR_ARTIFACT_KEY = new GACT("io.quarkus", "quarkus-kafka-client-ui", null, "jar"); + private static final String KAFKA_UI_WEBJAR_STATIC_RESOURCES_PATH = "META-INF/resources/kafka-ui/"; + private static final String FILE_TO_UPDATE = "config.js"; + private static final String LINE_TO_UPDATE = "export const api = '"; + private static final String LINE_FORMAT = LINE_TO_UPDATE + "%s';"; + private static final String UI_LINE_TO_UPDATE = "export const ui = '"; + private static final String UI_LINE_FORMAT = UI_LINE_TO_UPDATE + "%s';"; + private static final String LOGO_LINE_TO_UPDATE = "export const logo = '"; + private static final String LOGO_LINE_FORMAT = LOGO_LINE_TO_UPDATE + "%s';"; + private static final String UI_LOGO_PATH = "logo.png"; + // UI brandibg + private static final String BRANDING_DIR = "META-INF/branding/"; + private static final String BRANDING_LOGO_GENERAL = BRANDING_DIR + "logo.png"; + private static final String BRANDING_LOGO_MODULE = BRANDING_DIR + "quarkus-kafka-client-ui.png"; + private static final String BRANDING_STYLE_GENERAL = BRANDING_DIR + "style.css"; + private static final String BRANDING_STYLE_MODULE = BRANDING_DIR + "quarkus-kafka-client-ui.css"; + private static final String BRANDING_FAVICON_GENERAL = BRANDING_DIR + "favicon.ico"; + private static final String BRANDING_FAVICON_MODULE = BRANDING_DIR + "quarkus-kafka-client-ui.ico"; @BuildStep FeatureBuildItem feature() { @@ -165,7 +208,8 @@ void silenceUnwantedConfigLogs(BuildProducer logClean List ignoredMessages = new ArrayList<>(); for (String ignoredConfigProperty : ignoredConfigProperties) { - ignoredMessages.add("The configuration '" + ignoredConfigProperty + "' was supplied but isn't a known config."); + ignoredMessages + .add("The configuration '" + ignoredConfigProperty + "' was supplied but isn't a known config."); } logCleanupFilters.produce(new LogCleanupFilterBuildItem("org.apache.kafka.clients.consumer.ConsumerConfig", @@ -478,4 +522,166 @@ void registerServiceBinding(Capabilities capabilities, KafkaBindingConverter.class.getName())); } } + + // Kafka UI related stuff + + @BuildStep + public AdditionalBeanBuildItem kafkaClientBeans() { + return AdditionalBeanBuildItem.builder() + .addBeanClass(KafkaAdminClient.class) + .addBeanClass(KafkaTopicClient.class) + .addBeanClass(KafkaUiUtils.class) + .setUnremovable() + .build(); + } + + @BuildStep + @Record(ExecutionTime.RUNTIME_INIT) + public void registerKafkaUiExecHandler( + BuildProducer routeProducer, + KafkaUiRecorder recorder, + LaunchModeBuildItem launchMode, + HttpRootPathBuildItem httpRootPathBuildItem, + NonApplicationRootPathBuildItem nonApplicationRootPathBuildItem, + KafkaBuildTimeConfig buildConfig, + BodyHandlerBuildItem bodyHandlerBuildItem, + ShutdownContextBuildItem shutdownContext) { + + if (shouldIncludeUi(launchMode, buildConfig)) { + String handlerPath = nonApplicationRootPathBuildItem.resolvePath(buildConfig.ui.handlerRootPath); + Handler executionHandler = recorder.kafkaControlHandler(); + HttpRootPathBuildItem.Builder requestBuilder = httpRootPathBuildItem.routeBuilder() + .routeFunction(handlerPath, recorder.routeFunction(bodyHandlerBuildItem.getHandler())) + .handler(executionHandler) + .routeConfigKey("quarkus.kafka-client-ui.root-path") + .displayOnNotFoundPage("Kafka UI Endpoint"); + + routeProducer.produce(requestBuilder.build()); + } + } + + @BuildStep + List uiBrandingFiles() { + return Stream.of(BRANDING_LOGO_GENERAL, + BRANDING_STYLE_GENERAL, + BRANDING_FAVICON_GENERAL, + BRANDING_LOGO_MODULE, + BRANDING_STYLE_MODULE, + BRANDING_FAVICON_MODULE).map(HotDeploymentWatchedFileBuildItem::new) + .collect(Collectors.toList()); + } + + @BuildStep + void getKafkaUiFinalDestination( + HttpRootPathBuildItem httpRootPath, + NonApplicationRootPathBuildItem nonApplicationRootPathBuildItem, + LaunchModeBuildItem launchMode, + KafkaBuildTimeConfig buildConfig, + BuildProducer webJarBuildProducer) { + + if (shouldIncludeUi(launchMode, buildConfig)) { + + if ("/".equals(buildConfig.ui.rootPath)) { + throw new ConfigurationException( + "quarkus.kafka-client-ui.root-path was set to \"/\", this is not allowed as it blocks the application from serving anything else.", + Collections.singleton("quarkus.kafka-client-ui.root-path")); + } + + String devUiPath = nonApplicationRootPathBuildItem.resolvePath("dev"); + String kafkaUiPath = nonApplicationRootPathBuildItem.resolvePath(buildConfig.ui.rootPath); + String kafkaHandlerPath = nonApplicationRootPathBuildItem.resolvePath(buildConfig.ui.handlerRootPath); + webJarBuildProducer.produce( + WebJarBuildItem.builder().artifactKey(KAFKA_UI_WEBJAR_ARTIFACT_KEY) + .root(KAFKA_UI_WEBJAR_STATIC_RESOURCES_PATH) + .filter(new WebJarResourcesFilter() { + @Override + public WebJarResourcesFilter.FilterResult apply(String fileName, InputStream file) + throws IOException { + if (fileName.endsWith(FILE_TO_UPDATE)) { + String content = new String(file.readAllBytes(), StandardCharsets.UTF_8); + content = updateUrl(content, kafkaHandlerPath, + LINE_TO_UPDATE, + LINE_FORMAT); + content = updateUrl(content, kafkaUiPath, + UI_LINE_TO_UPDATE, + UI_LINE_FORMAT); + content = updateUrl(content, + getLogoUrl(launchMode, kafkaUiPath + "/" + UI_LOGO_PATH, + kafkaUiPath + "/" + UI_LOGO_PATH), + LOGO_LINE_TO_UPDATE, + LOGO_LINE_FORMAT); + + return new WebJarResourcesFilter.FilterResult( + new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)), true); + } + + return new WebJarResourcesFilter.FilterResult(file, false); + } + }) + .build()); + } + } + + @BuildStep + @Record(ExecutionTime.RUNTIME_INIT) + void registerKafkaUiHandler( + BuildProducer routeProducer, + KafkaUiRecorder recorder, + LaunchModeBuildItem launchMode, + NonApplicationRootPathBuildItem nonApplicationRootPathBuildItem, + KafkaBuildTimeConfig buildConfig, + WebJarResultsBuildItem webJarResultsBuildItem, + ShutdownContextBuildItem shutdownContext) { + + WebJarResultsBuildItem.WebJarResult result = webJarResultsBuildItem.byArtifactKey(KAFKA_UI_WEBJAR_ARTIFACT_KEY); + if (result == null) { + return; + } + + if (shouldIncludeUi(launchMode, buildConfig)) { + String kafkaUiPath = nonApplicationRootPathBuildItem.resolvePath(buildConfig.ui.rootPath); + String finalDestination = result.getFinalDestination(); + + Handler handler = recorder.uiHandler(finalDestination, + kafkaUiPath, result.getWebRootConfigurations(), shutdownContext); + routeProducer.produce(nonApplicationRootPathBuildItem.routeBuilder() + .route(buildConfig.ui.rootPath) + .displayOnNotFoundPage("Kafka UI") + .routeConfigKey("quarkus.kafka-client.ui.root-path") + .handler(handler) + .build()); + + routeProducer.produce(nonApplicationRootPathBuildItem.routeBuilder() + .route(buildConfig.ui.rootPath + "*") + .handler(handler) + .build()); + + } + } + + // In dev mode, when you click on the logo, you should go to Dev UI + private String getLogoUrl(LaunchModeBuildItem launchMode, String devUIValue, String defaultValue) { + if (launchMode.getLaunchMode().equals(LaunchMode.DEVELOPMENT)) { + return devUIValue; + } + return defaultValue; + } + + private String updateUrl(String original, String path, String lineStartsWith, String format) { + try (Scanner scanner = new Scanner(original)) { + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + if (line.trim().startsWith(lineStartsWith)) { + String newLine = String.format(format, path); + return original.replace(line.trim(), newLine); + } + } + } + + return original; + } + + private static boolean shouldIncludeUi(LaunchModeBuildItem launchMode, KafkaBuildTimeConfig config) { + return launchMode.getLaunchMode().isDevOrTest() || config.ui.alwaysInclude; + } } diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-templates/embedded.html b/extensions/kafka-client/deployment/src/main/resources/dev-templates/embedded.html new file mode 100644 index 0000000000000..312a6c5268e02 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-templates/embedded.html @@ -0,0 +1,3 @@ + + + Kafka UI diff --git a/extensions/kafka-client/pom.xml b/extensions/kafka-client/pom.xml index a0e0885d2b170..dd7448eb06106 100644 --- a/extensions/kafka-client/pom.xml +++ b/extensions/kafka-client/pom.xml @@ -15,6 +15,7 @@ pom + ui deployment runtime diff --git a/extensions/kafka-client/runtime/pom.xml b/extensions/kafka-client/runtime/pom.xml index 50ac39873257c..0e037445a128e 100644 --- a/extensions/kafka-client/runtime/pom.xml +++ b/extensions/kafka-client/runtime/pom.xml @@ -59,6 +59,11 @@ provided + + io.quarkus + quarkus-vertx-http + + io.quarkus quarkus-junit5-internal diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/health/KafkaHealthCheck.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/health/KafkaHealthCheck.java index a0b7c6648caa7..e9b9a24bd265d 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/health/KafkaHealthCheck.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/health/KafkaHealthCheck.java @@ -1,43 +1,23 @@ package io.quarkus.kafka.client.health; -import java.util.HashMap; -import java.util.Map; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.Node; import org.eclipse.microprofile.health.HealthCheck; import org.eclipse.microprofile.health.HealthCheckResponse; import org.eclipse.microprofile.health.HealthCheckResponseBuilder; import org.eclipse.microprofile.health.Readiness; -import io.smallrye.common.annotation.Identifier; +import io.quarkus.kafka.client.runtime.KafkaAdminClient; @Readiness @ApplicationScoped public class KafkaHealthCheck implements HealthCheck { - @Inject - @Identifier("default-kafka-broker") - Map config; - - private AdminClient client; - - @PostConstruct - void init() { - Map conf = new HashMap<>(config); - conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); - client = AdminClient.create(conf); - } + KafkaAdminClient kafkaAdminClient; - @PreDestroy - void stop() { - client.close(); + public KafkaHealthCheck(KafkaAdminClient kafkaAdminClient) { + this.kafkaAdminClient = kafkaAdminClient; } @Override @@ -45,7 +25,7 @@ public HealthCheckResponse call() { HealthCheckResponseBuilder builder = HealthCheckResponse.named("Kafka connection health check").up(); try { StringBuilder nodes = new StringBuilder(); - for (Node node : client.describeCluster().nodes().get()) { + for (Node node : kafkaAdminClient.getCluster().nodes().get()) { if (nodes.length() > 0) { nodes.append(','); } diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java new file mode 100644 index 0000000000000..78f68171bab4f --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java @@ -0,0 +1,76 @@ +package io.quarkus.kafka.client.runtime; + +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest; +import io.smallrye.common.annotation.Identifier; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.resource.ResourcePatternFilter; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.resource.ResourcePatternFilter; + +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest; +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +public class KafkaAdminClient { + private static final int DEFAULT_ADMIN_CLIENT_TIMEOUT = 5000; + + @Inject + @Identifier("default-kafka-broker") + Map config; + + private AdminClient client; + + @PostConstruct + void init() { + Map conf = new HashMap<>(config); + conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_ADMIN_CLIENT_TIMEOUT); + client = AdminClient.create(conf); + } + + @PreDestroy + void stop() { + client.close(); + } + + public DescribeClusterResult getCluster() { + return client.describeCluster(); + } + + public Collection getTopics() throws InterruptedException, ExecutionException { + return client.listTopics().listings().get(); + } + + public boolean deleteTopic(String name) { + Collection topics = new ArrayList<>(); + topics.add(name); + DeleteTopicsResult dtr = client.deleteTopics(topics); + return dtr.topicNameValues() != null; + } + + public boolean createTopic(KafkaCreateTopicRequest kafkaCreateTopicRq) { + var partitions = Optional.ofNullable(kafkaCreateTopicRq.getPartitions()).orElse(1); + var replications = Optional.ofNullable(kafkaCreateTopicRq.getReplications()).orElse((short) 1); + var newTopic = new NewTopic(kafkaCreateTopicRq.getTopicName(), partitions, replications); + + CreateTopicsResult ctr = client.createTopics(List.of(newTopic)); + return ctr.values() != null; + } + + public Collection getAclInfo() throws InterruptedException, ExecutionException { + AclBindingFilter filter = new AclBindingFilter(ResourcePatternFilter.ANY, AccessControlEntryFilter.ANY); + var options = new DescribeAclsOptions().timeoutMs(1_000); + return client.describeAcls(filter, options).values().get(); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java index 2be14e5717251..93e2ca309ab99 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java @@ -17,7 +17,7 @@ public class KafkaRuntimeConfigProducer { // not "kafka.", because we also inspect env vars, which start with "KAFKA_" private static final String CONFIG_PREFIX = "kafka"; - + private static final String UI_CONFIG_PREFIX = CONFIG_PREFIX + ".ui"; private static final String GROUP_ID = "group.id"; @Produces @@ -29,7 +29,10 @@ public Map createKafkaRuntimeConfig(Config config, ApplicationCo for (String propertyName : config.getPropertyNames()) { String propertyNameLowerCase = propertyName.toLowerCase(); - if (!propertyNameLowerCase.startsWith(CONFIG_PREFIX)) { + if (propertyNameLowerCase.startsWith(UI_CONFIG_PREFIX)) { + config.getOptionalValue(propertyName, String.class).orElse(""); + } + if (!propertyNameLowerCase.startsWith(CONFIG_PREFIX) || propertyNameLowerCase.startsWith(UI_CONFIG_PREFIX)) { continue; } // Replace _ by . - This is because Kafka properties tend to use . and env variables use _ for every special diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/AbstractHttpRequestHandler.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/AbstractHttpRequestHandler.java new file mode 100644 index 0000000000000..bc83093041b33 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/AbstractHttpRequestHandler.java @@ -0,0 +1,99 @@ +package io.quarkus.kafka.client.runtime.ui; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.ManagedContext; +import io.quarkus.security.identity.CurrentIdentityAssociation; +import io.quarkus.security.identity.SecurityIdentity; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; +import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.RoutingContext; + +public abstract class AbstractHttpRequestHandler implements Handler { + private final CurrentIdentityAssociation currentIdentityAssociation; + private final CurrentVertxRequest currentVertxRequest; + private final ManagedContext currentManagedContext; + private final Handler currentManagedContextTerminationHandler; + + public AbstractHttpRequestHandler(CurrentIdentityAssociation currentIdentityAssociation, + CurrentVertxRequest currentVertxRequest) { + this.currentIdentityAssociation = currentIdentityAssociation; + this.currentVertxRequest = currentVertxRequest; + this.currentManagedContext = Arc.container().requestContext(); + this.currentManagedContextTerminationHandler = e -> currentManagedContext.terminate(); + } + + @Override + @SuppressWarnings("unchecked") // ignore currentManagedContextTerminationHandler types, just use Object + public void handle(final RoutingContext ctx) { + + if (currentManagedContext.isActive()) { + handleWithIdentity(ctx); + } else { + + currentManagedContext.activate(); + ctx.response() + .endHandler(currentManagedContextTerminationHandler) + .exceptionHandler(currentManagedContextTerminationHandler) + .closeHandler(currentManagedContextTerminationHandler); + + try { + handleWithIdentity(ctx); + } catch (Throwable t) { + currentManagedContext.terminate(); + throw t; + } + } + } + + public void doHandle(RoutingContext ctx) { + try { + HttpServerRequest request = ctx.request(); + + switch (request.method().name()) { + case "OPTIONS": + handleOptions(ctx); + break; + case "POST": + handlePost(ctx); + break; + case "GET": + handleGet(ctx); + break; + default: + ctx.next(); + break; + } + } catch (Exception e) { + ctx.fail(e); + } + } + + private void handleWithIdentity(final RoutingContext ctx) { + if (currentIdentityAssociation != null) { + QuarkusHttpUser existing = (QuarkusHttpUser) ctx.user(); + if (existing != null) { + SecurityIdentity identity = existing.getSecurityIdentity(); + currentIdentityAssociation.setIdentity(identity); + } else { + currentIdentityAssociation.setIdentity(QuarkusHttpUser.getSecurityIdentity(ctx, null)); + } + } + currentVertxRequest.setCurrent(ctx); + doHandle(ctx); + } + + public abstract void handlePost(RoutingContext event); + + public abstract void handleGet(RoutingContext event); + + public abstract void handleOptions(RoutingContext event); + + protected String getRequestPath(RoutingContext event) { + HttpServerRequest request = event.request(); + return request.path(); + } + + //TODO: service methods for HTTP requests +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java new file mode 100644 index 0000000000000..a02d760185733 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java @@ -0,0 +1,50 @@ +package io.quarkus.kafka.client.runtime.ui; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import javax.inject.Singleton; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.TopicPartitionInfo; + +import io.smallrye.common.annotation.Identifier; + +@Singleton +public class KafkaTopicClient { + //TODO: inject me + private AdminClient adminClient; + + @Inject + @Identifier("default-kafka-broker") + Map config; + + @PostConstruct + void init() { + Map conf = new HashMap<>(config); + conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); + adminClient = AdminClient.create(conf); + } + + public List partitions(String topicName) throws ExecutionException, InterruptedException { + return adminClient.describeTopics(List.of(topicName)) + .allTopicNames() + .get() + .values().stream() + .reduce((a, b) -> { + throw new IllegalStateException( + "Requested info about single topic, but got result of multiple: " + a + ", " + b); + }) + .orElseThrow(() -> new IllegalStateException( + "Requested info about a topic, but nothing found. Topic name: " + topicName)) + .partitions().stream() + .map(TopicPartitionInfo::partition) + .collect(Collectors.toList()); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java new file mode 100644 index 0000000000000..54f18f49a61ab --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java @@ -0,0 +1,113 @@ + +package io.quarkus.kafka.client.runtime.ui; + +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; + +import java.util.concurrent.ExecutionException; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.quarkus.arc.Arc; +import io.quarkus.kafka.client.runtime.KafkaAdminClient; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest; +import io.quarkus.security.identity.CurrentIdentityAssociation; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.RoutingContext; + +public class KafkaUiHandler extends AbstractHttpRequestHandler { + + public KafkaUiHandler(CurrentIdentityAssociation currentIdentityAssociation, CurrentVertxRequest currentVertxRequest) { + super(currentIdentityAssociation, currentVertxRequest); + } + + @Override + public void handlePost(RoutingContext event) { + if (event.body() == null) { + endResponse(event, BAD_REQUEST, "Request body is null"); + return; + } + var body = event.body().asJsonObject(); + if (body == null) { + endResponse(event, BAD_REQUEST, "Request JSON body is null"); + return; + } + var action = body.getString("action"); + + var message = "OK"; + var error = ""; + + var webUtils = kafkaWebUiUtils(); + var adminClient = kafkaAdminClient(); + + boolean res = false; + if (null != action) { + try { + switch (action) { + case "getInfo": + message = webUtils.toJson(webUtils.getKafkaInfo()); + res = true; + break; + case "getAclInfo": + message = webUtils.toJson(webUtils.getAclInfo()); + res = true; + break; + case "createTopic": + var topicCreateRq = event.body().asPojo(KafkaCreateTopicRequest.class); + res = adminClient.createTopic(topicCreateRq); + message = webUtils.toJson(webUtils.getTopics()); + break; + case "deleteTopic": + res = adminClient.deleteTopic(body.getString("key")); + message = "{}"; + res = true; + break; + case "getTopics": + message = webUtils.toJson(webUtils.getTopics()); + res = true; + break; + default: + break; + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ex) { + throw new RuntimeException(ex); + } + } + + if (res) { + endResponse(event, OK, message); + } else { + message = "ERROR: " + error; + endResponse(event, BAD_REQUEST, message); + } + } + + private void endResponse(RoutingContext event, HttpResponseStatus status, String message) { + event.response().setStatusCode(status.code()); + event.response().end(message); + } + + private KafkaUiUtils kafkaWebUiUtils() { + return Arc.container().instance(KafkaUiUtils.class).get(); + } + + @Override + public void handleGet(RoutingContext event) { + //TODO: move pure get requests processing here + HttpServerRequest request = event.request(); + String path = request.path(); + endResponse(event, OK, "GET method is not supported yet. Path is: " + path); + } + + @Override + public void handleOptions(RoutingContext event) { + endResponse(event, OK, "OPTION method is not supported yet"); + } + + private KafkaAdminClient kafkaAdminClient() { + return Arc.container().instance(KafkaAdminClient.class).get(); + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiRecorder.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiRecorder.java new file mode 100644 index 0000000000000..21d89717ffcbc --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiRecorder.java @@ -0,0 +1,49 @@ +package io.quarkus.kafka.client.runtime.ui; + +import java.util.List; +import java.util.function.Consumer; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.InstanceHandle; +import io.quarkus.runtime.ShutdownContext; +import io.quarkus.runtime.annotations.Recorder; +import io.quarkus.security.identity.CurrentIdentityAssociation; +import io.quarkus.vertx.http.runtime.CurrentVertxRequest; +import io.quarkus.vertx.http.runtime.devmode.FileSystemStaticHandler; +import io.quarkus.vertx.http.runtime.webjar.WebJarStaticHandler; +import io.vertx.core.Handler; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.RoutingContext; + +/** + * Handles requests from kafka UI and html/js of UI + */ +@Recorder +public class KafkaUiRecorder { + + public Handler kafkaControlHandler() { + return new KafkaUiHandler(getCurrentIdentityAssociation(), + Arc.container().instance(CurrentVertxRequest.class).get()); + } + + public Consumer routeFunction(Handler bodyHandler) { + return route -> route.handler(bodyHandler); + } + + public Handler uiHandler(String finalDestination, String uiPath, + List webRootConfigurations, + ShutdownContext shutdownContext) { + WebJarStaticHandler handler = new WebJarStaticHandler(finalDestination, uiPath, webRootConfigurations); + shutdownContext.addShutdownTask(new ShutdownContext.CloseRunnable(handler)); + return handler; + } + + private CurrentIdentityAssociation getCurrentIdentityAssociation() { + InstanceHandle identityAssociations = Arc.container() + .instance(CurrentIdentityAssociation.class); + if (identityAssociations.isAvailable()) { + return identityAssociations.get(); + } + return null; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java new file mode 100644 index 0000000000000..31ac51173e07e --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java @@ -0,0 +1,135 @@ +package io.quarkus.kafka.client.runtime.ui; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import javax.inject.Singleton; + +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.Node; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.quarkus.kafka.client.runtime.KafkaAdminClient; +import io.quarkus.kafka.client.runtime.ui.model.response.KafkaClusterInfo; +import io.quarkus.kafka.client.runtime.ui.model.response.KafkaInfo; +import io.quarkus.kafka.client.runtime.ui.model.response.KafkaNode; +import io.quarkus.kafka.client.runtime.ui.model.response.KafkaTopic; + +@Singleton +public class KafkaUiUtils { + + private final KafkaAdminClient kafkaAdminClient; + + private final KafkaTopicClient kafkaTopicClient; + + private final ObjectMapper objectMapper; + + public KafkaUiUtils(KafkaAdminClient kafkaAdminClient, KafkaTopicClient kafkaTopicClient, ObjectMapper objectMapper) { + this.kafkaAdminClient = kafkaAdminClient; + this.kafkaTopicClient = kafkaTopicClient; + this.objectMapper = objectMapper; + } + + public KafkaInfo getKafkaInfo() throws ExecutionException, InterruptedException { + var clusterInfo = getClusterInfo(); + var broker = clusterInfo.getController().asFullNodeName(); + var topics = getTopics(); + return new KafkaInfo(broker, clusterInfo, topics); + } + + public KafkaClusterInfo getClusterInfo() throws ExecutionException, InterruptedException { + return clusterInfo(kafkaAdminClient.getCluster()); + } + + private KafkaNode kafkaNode(Node node) { + return new KafkaNode(node.host(), node.port(), node.idString()); + } + + private KafkaClusterInfo clusterInfo(DescribeClusterResult dcr) throws InterruptedException, ExecutionException { + var controller = kafkaNode(dcr.controller().get()); + var nodes = new ArrayList(); + for (var node : dcr.nodes().get()) { + nodes.add(kafkaNode(node)); + } + var aclOperations = dcr.authorizedOperations().get(); + + var aclOperationsStr = new StringBuilder(); + if (aclOperations != null) { + for (var operation : dcr.authorizedOperations().get()) { + if (aclOperationsStr.length() == 0) { + aclOperationsStr.append(", "); + } + aclOperationsStr.append(operation.name()); + } + } else { + aclOperationsStr = new StringBuilder("NONE"); + } + + return new KafkaClusterInfo( + dcr.clusterId().get(), + controller, + nodes, + aclOperationsStr.toString()); + } + + public List getTopics() throws InterruptedException, ExecutionException { + var res = new ArrayList(); + for (TopicListing tl : kafkaAdminClient.getTopics()) { + res.add(kafkaTopic(tl)); + } + return res; + } + + private KafkaTopic kafkaTopic(TopicListing tl) throws ExecutionException, InterruptedException { + var partitions = partitions(tl.name()); + return new KafkaTopic( + tl.name(), + tl.topicId().toString(), + partitions.size(), + tl.isInternal()); + } + + public Collection partitions(String topicName) throws ExecutionException, InterruptedException { + return kafkaTopicClient.partitions(topicName); + } + + public KafkaAclInfo getAclInfo() throws InterruptedException, ExecutionException { + var clusterInfo = clusterInfo(kafkaAdminClient.getCluster()); + var entries = new ArrayList(); + //TODO: fix it after proper error message impl + try { + var acls = kafkaAdminClient.getAclInfo(); + for (var acl : acls) { + var entry = new KafkaAclEntry( + acl.entry().operation().name(), + acl.entry().principal(), + acl.entry().permissionType().name(), + acl.pattern().toString()); + entries.add(entry); + } + } catch (Exception e) { + // this mostly means that ALC controller is absent + } + return new KafkaAclInfo( + clusterInfo.getId(), + clusterInfo.getController().asFullNodeName(), + clusterInfo.getAclOperations(), + entries); + } + + public String toJson(Object o) { + String res; + try { + res = objectMapper.writeValueAsString(o); + } catch (JsonProcessingException ex) { + //FIXME: + res = ""; + } + return res; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaCreateTopicRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaCreateTopicRequest.java new file mode 100644 index 0000000000000..8fbe12f9c2500 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaCreateTopicRequest.java @@ -0,0 +1,28 @@ +package io.quarkus.kafka.client.runtime.ui.model.request; + +public class KafkaCreateTopicRequest { + private String topicName; + private Integer partitions; + private Short replications; + + public KafkaCreateTopicRequest() { + } + + public KafkaCreateTopicRequest(String topicName, Integer partitions, Short replications) { + this.topicName = topicName; + this.partitions = partitions; + this.replications = replications; + } + + public String getTopicName() { + return topicName; + } + + public Integer getPartitions() { + return partitions; + } + + public Short getReplications() { + return replications; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclEntry.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclEntry.java new file mode 100644 index 0000000000000..b32a0d729f6b7 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclEntry.java @@ -0,0 +1,34 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaAclEntry { + private String operation; + private String principal; + private String perm; + private String pattern; + + public KafkaAclEntry() { + } + + public KafkaAclEntry(String operation, String principal, String perm, String pattern) { + this.operation = operation; + this.principal = principal; + this.perm = perm; + this.pattern = pattern; + } + + public String getOperation() { + return operation; + } + + public String getPrincipal() { + return principal; + } + + public String getPerm() { + return perm; + } + + public String getPattern() { + return pattern; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclInfo.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclInfo.java new file mode 100644 index 0000000000000..4e53287f220b7 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclInfo.java @@ -0,0 +1,37 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.ArrayList; +import java.util.List; + +public class KafkaAclInfo { + private String clusterId; + private String broker; + private String aclOperations; + private List entries = new ArrayList<>(); + + public KafkaAclInfo() { + } + + public KafkaAclInfo(String clusterId, String broker, String aclOperations, List entries) { + this.clusterId = clusterId; + this.broker = broker; + this.aclOperations = aclOperations; + this.entries = entries; + } + + public String getClusterId() { + return clusterId; + } + + public String getBroker() { + return broker; + } + + public String getAclOperations() { + return aclOperations; + } + + public List getEntries() { + return entries; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaClusterInfo.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaClusterInfo.java new file mode 100644 index 0000000000000..71e8e67c69b11 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaClusterInfo.java @@ -0,0 +1,37 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.ArrayList; +import java.util.List; + +public class KafkaClusterInfo { + private String id; + private KafkaNode controller; + private List nodes = new ArrayList<>(); + private String aclOperations; + + public KafkaClusterInfo() { + } + + public KafkaClusterInfo(String id, KafkaNode controller, List nodes, String aclOperations) { + this.id = id; + this.controller = controller; + this.nodes = nodes; + this.aclOperations = aclOperations; + } + + public String getId() { + return id; + } + + public KafkaNode getController() { + return controller; + } + + public List getNodes() { + return nodes; + } + + public String getAclOperations() { + return aclOperations; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java new file mode 100644 index 0000000000000..d095170b8bdf8 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java @@ -0,0 +1,31 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.List; + +public class KafkaInfo { + private String broker; + private KafkaClusterInfo clusterInfo; + private List topics; + + public KafkaInfo() { + } + + public KafkaInfo(String broker, KafkaClusterInfo clusterInfo, List topics) { + this.broker = broker; + this.clusterInfo = clusterInfo; + this.topics = topics; + } + + public String getBroker() { + return broker; + } + + public List getTopics() { + return topics; + } + + public KafkaClusterInfo getClusterInfo() { + return clusterInfo; + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaNode.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaNode.java new file mode 100644 index 0000000000000..137645a7c29ee --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaNode.java @@ -0,0 +1,32 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaNode { + private String host; + private int port; + private String id; + + public KafkaNode() { + } + + public KafkaNode(String host, int port, String id) { + this.host = host; + this.port = port; + this.id = id; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getId() { + return id; + } + + public String asFullNodeName() { + return host + ":" + port; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java new file mode 100644 index 0000000000000..b678b50afc344 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java @@ -0,0 +1,40 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaTopic { + private String name; + private String topicId; + private int partitionsCount; + private boolean internal; + + public KafkaTopic() { + } + + public KafkaTopic(String name, String topicId, int partitionsCount, boolean internal) { + this.name = name; + this.topicId = topicId; + this.partitionsCount = partitionsCount; + this.internal = internal; + } + + public String getName() { + return name; + } + + public String getTopicId() { + return topicId; + } + + public int getPartitionsCount() { + return partitionsCount; + } + + public boolean isInternal() { + return internal; + } + + public String toString() { + StringBuilder sb = new StringBuilder(name); + sb.append(" : ").append(topicId); + return sb.toString(); + } +} diff --git a/extensions/kafka-client/ui/pom.xml b/extensions/kafka-client/ui/pom.xml new file mode 100644 index 0000000000000..31960a9ee918e --- /dev/null +++ b/extensions/kafka-client/ui/pom.xml @@ -0,0 +1,212 @@ + + + 4.0.0 + + + io.quarkus + quarkus-kafka-client-parent + 999-SNAPSHOT + + + quarkus-kafka-client-ui + jar + Quarkus - Kafka - Client - UI + + + kafka-ui + 0.9.15 + 1.9.1 + + + + + + org.webjars + bootstrap + provided + + + org.webjars + font-awesome + provided + + + org.webjars + jquery + provided + + + org.webjars + bootstrap-multiselect + ${bootstrap-mutiliselect.version} + + + org.webjars.npm + bootstrap-icons + ${bootstrap-icons.version} + + + + + + + + maven-resources-plugin + + + copy-web + generate-sources + + copy-resources + + + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui} + + + ${basedir}/src/main/webapp + + **/*.* + + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.3.0 + + + install-js + generate-sources + + unpack + + + + + + org.webjars + bootstrap + ${webjar.bootstrap.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/css/ + **/bootstrap.min.css, **/bootstrap.min.css.map + + + + + + org.webjars + bootstrap + ${webjar.bootstrap.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/js/ + **/bootstrap.bundle.min.js, **/bootstrap.bundle.min.js.map + + + + + + org.webjars + bootstrap-multiselect + ${bootstrap-mutiliselect.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/js/ + **/bootstrap-multiselect.js + + + + + + org.webjars + bootstrap-multiselect + ${bootstrap-mutiliselect.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/css/ + **/bootstrap-multiselect.css + + + + + + org.webjars.npm + bootstrap-icons + ${bootstrap-icons.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/css/ + **/font/bootstrap-icons.css + + + + + + org.webjars.npm + bootstrap-icons + ${bootstrap-icons.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/css/fonts/ + **/font/fonts/ + + + + + + + + org.webjars + jquery + ${webjar.jquery.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/js/ + **/jquery.min.js, **/jquery.min.js.map + + + + + + + org.webjars + font-awesome + ${webjar.font-awesome.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/fontawesome/css + **/css/all.min.css + + + + + + org.webjars + font-awesome + ${webjar.font-awesome.version} + jar + true + ${project.build.directory}/classes/META-INF/resources/${path.kafkaui}/fontawesome/webfonts + **/webfonts/**.* + + + + + + + + + + + + + + + diff --git a/extensions/kafka-client/ui/src/main/webapp/favicon.ico b/extensions/kafka-client/ui/src/main/webapp/favicon.ico new file mode 100644 index 0000000000000..b4ef4208a6f48 Binary files /dev/null and b/extensions/kafka-client/ui/src/main/webapp/favicon.ico differ diff --git a/extensions/kafka-client/ui/src/main/webapp/index.html b/extensions/kafka-client/ui/src/main/webapp/index.html new file mode 100644 index 0000000000000..975d965858710 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/index.html @@ -0,0 +1,232 @@ + + + + + kafka-devUI + + + + + + + + + + +
+ + +
+
+
+ + + + + + + + + + + + +
Topic NameIdPartitions countNumber of msg
+
+ +
+
+
+
+
+ + + + + + + + + + + + + + +
Offset + + Timestamp KeyValue
+
+ +
+
+ + +
+
+ Schema registry is not implemeted yet.
+
+
+
+ + + + + + + + + + + + + +
StateIdCoordinatorProtocolMembersLag(Sum)
+
+
+
+
+ + + + + + + + + + + + +
Member IDHostPartitionsLag(Sum)
+
+
+
+
+
+ Kafka cluster id: 
+ Controller node (broker): 
+ ACL operations: 
+
+
+

Access Control Lists

+
+ + + + + + + + + + + +
OperationPrinicipalPermissionsPattern
+
+
+
+
+
+ Kafka cluster id: 
+ Controller node (broker): 
+ ACL operations: 
+
+
+

Cluster nodes

+
+ + + + + + + + + + +
IdHostPort
+
+
+
+
+ + + + + + + diff --git a/extensions/kafka-client/ui/src/main/webapp/logo.png b/extensions/kafka-client/ui/src/main/webapp/logo.png new file mode 100644 index 0000000000000..6a1626104eb98 Binary files /dev/null and b/extensions/kafka-client/ui/src/main/webapp/logo.png differ diff --git a/extensions/kafka-client/ui/src/main/webapp/pages/accessControlListPage.js b/extensions/kafka-client/ui/src/main/webapp/pages/accessControlListPage.js new file mode 100644 index 0000000000000..2e396ec39874a --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/pages/accessControlListPage.js @@ -0,0 +1,49 @@ +import {doPost, errorPopUp} from "../web/web.js"; +import {createTableItem} from "../util/contentManagement.js"; +import {toggleSpinner} from "../util/spinner.js"; + +export default class AccessControlListPage{ + constructor(containerId) { + this.containerId = containerId; + Object.getOwnPropertyNames(AccessControlListPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + + open() { + const req = { + action: "getAclInfo" + }; + doPost(req, (data) => { + let that = this; + setTimeout(function () { + that.updateInfo(data); + toggleSpinner(that.containerId); + }, 2000); + }, data => { + errorPopUp("Error getting Kafka ACL info: ", data); + }); + } + + updateInfo(data) { + $('#acluster-id').html(data.clusterId); + $('#acluster-controller').html(data.broker); + $('#acluster-acl').html(data.aclOperations); + + const acls = data.entires; + let aclTable = $('#acl-table tbody'); + aclTable.empty(); + for (let i = 0; i < acls.length; i++) { + const e = acls[i]; + let tableRow = $(""); + tableRow.append(createTableItem(e.operation)); + tableRow.append(createTableItem(e.prinipal)); + tableRow.append(createTableItem(e.perm)); + tableRow.append(createTableItem(e.pattern)); + aclTable.append(tableRow); + } + } +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/pages/nodesPage.js b/extensions/kafka-client/ui/src/main/webapp/pages/nodesPage.js new file mode 100644 index 0000000000000..ad8675344e22b --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/pages/nodesPage.js @@ -0,0 +1,48 @@ +import {doPost, errorPopUp} from "../web/web.js"; +import {createTableItem} from "../util/contentManagement.js"; +import {toggleSpinner} from "../util/spinner.js"; + +export default class NodesPage { + constructor(containerId) { + this.containerId = containerId; + Object.getOwnPropertyNames(NodesPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + open() { + const req = { + action: "getInfo" + }; + doPost(req, (data) => { + let that = this; + setTimeout(function () { + that.updateInfo(data); + toggleSpinner(that.containerId); + }, 2000); + }, data => { + errorPopUp("Error getting Kafka info: ", data); + }); + toggleSpinner(this.containerId); + } + + updateInfo(data) { + $('#cluster-id').html(data.clusterInfo.id); + $('#cluster-controller').html(data.broker); + $('#cluster-acl').html(data.clusterInfo.aclOperations); + + const nodes = data.clusterInfo.nodes; + let clusterNodesTable = $('#cluster-table tbody'); + clusterNodesTable.empty(); + for (let i = 0; i < nodes.length; i++) { + const d = nodes[i]; + let tableRow = $(""); + tableRow.append(createTableItem(d.id)); + tableRow.append(createTableItem(d.host)); + tableRow.append(createTableItem(d.port)); + clusterNodesTable.append(tableRow); + } + } +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/pages/schemaPage.js b/extensions/kafka-client/ui/src/main/webapp/pages/schemaPage.js new file mode 100644 index 0000000000000..82b3f5f8d108c --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/pages/schemaPage.js @@ -0,0 +1,16 @@ +export default class SchemaPage{ + constructor(containerId) { + this.containerId = containerId; + Object.getOwnPropertyNames(SchemaPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + // TODO: stub. must be implemented by all pages + open(){ + + } + +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/quarkus_icon_rgb_reverse.svg b/extensions/kafka-client/ui/src/main/webapp/quarkus_icon_rgb_reverse.svg new file mode 100644 index 0000000000000..1969e1e886af3 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/quarkus_icon_rgb_reverse.svg @@ -0,0 +1 @@ +quarkus_icon_rgb_1024px_reverse \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/util/contentManagement.js b/extensions/kafka-client/ui/src/main/webapp/util/contentManagement.js new file mode 100644 index 0000000000000..50df67b1ccef4 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/util/contentManagement.js @@ -0,0 +1,29 @@ +export function createTableItem(text) { + return $("", { + text: text + }); +} + +export function createIcon(iconClass) { + return $("") + .addClass("bi") + .addClass(iconClass); +} + +export function showItem(selector){ + selector.addClass("shown") + .removeClass("hidden"); +} + +export function hideItem(selector){ + selector.addClass("hidden") + .removeClass("shown"); +} + +export function toggleItem(selector) { + if (selector.hasClass("shown")) { + hideItem(selector); + } else { + showItem(selector); + } +} \ No newline at end of file diff --git a/extensions/kafka-client/ui/src/main/webapp/web/web.js b/extensions/kafka-client/ui/src/main/webapp/web/web.js new file mode 100644 index 0000000000000..6ba79b5c19720 --- /dev/null +++ b/extensions/kafka-client/ui/src/main/webapp/web/web.js @@ -0,0 +1,22 @@ +import {api} from "../config.js" + +export function doPost(data, successCallback, errorCallback) { + $.ajax({ + url: api, + type: 'POST', + data: JSON.stringify(data), + contentType: "application/json; charset=utf-8", + dataType: 'json', + context: this, + success: (data) => successCallback(data), + error: (data, errorType, errorObj) => errorCallback(data, errorType, errorObj) + }); +} + +export function errorPopUp() { + let message = ""; + for (let i = 0; i < arguments.length; i++) { + message += arguments[i] + " "; + } + alert(message); +}