From 912d14a7a6184069f12a26a9af74624ea55014dd Mon Sep 17 00:00:00 2001 From: Daniel Petisme Date: Wed, 6 Oct 2021 13:39:54 +0200 Subject: [PATCH] Introducing Kafka Streams DevUI --- build-parent/pom.xml | 1 + .../deployment/KafkaStreamsProcessor.java | 4 +- .../devconsole/DevConsoleProcessor.java | 15 ++ .../resources/dev-templates/embedded.html | 3 + .../dev-templates/kafka-streams-topology.html | 178 ++++++++++++++++++ .../runtime/TopologyDescriptionSupplier.java | 17 ++ extensions/vertx-http/deployment/pom.xml | 13 ++ 7 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devconsole/DevConsoleProcessor.java create mode 100644 extensions/kafka-streams/deployment/src/main/resources/dev-templates/embedded.html create mode 100644 extensions/kafka-streams/deployment/src/main/resources/dev-templates/kafka-streams-topology.html create mode 100644 extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/TopologyDescriptionSupplier.java diff --git a/build-parent/pom.xml b/build-parent/pom.xml index 3e6e28fade2082..b37b364ae2614d 100644 --- a/build-parent/pom.xml +++ b/build-parent/pom.xml @@ -117,6 +117,7 @@ 5.15.2 3.5.1 5.62.0 + 8.9.1 0.12.1 diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java index 93774a61fbe28b..d1de7f0a2bf9bf 100644 --- a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java @@ -32,6 +32,7 @@ import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.RuntimeReinitializedClassBuildItem; import io.quarkus.deployment.pkg.NativeConfig; +import io.quarkus.kafka.streams.runtime.*; import io.quarkus.kafka.streams.runtime.KafkaStreamsProducer; import io.quarkus.kafka.streams.runtime.KafkaStreamsRecorder; import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig; @@ -53,7 +54,7 @@ void build(BuildProducer feature, registerClassesThatAreLoadedThroughReflection(reflectiveClasses, launchMode); registerClassesThatAreAccessedViaJni(jniRuntimeAccessibleClasses); - addSupportForRocksDbLib(nativeLibs, config); + // addSupportForRocksDbLib(nativeLibs, config); enableLoadOfNativeLibs(reinitialized); } @@ -184,4 +185,5 @@ void addHealthChecks(KafkaStreamsBuildTimeConfig buildTimeConfig, BuildProducer< "io.quarkus.kafka.streams.runtime.health.KafkaStreamsStateHealthCheck", buildTimeConfig.healthEnabled)); } + } diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devconsole/DevConsoleProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devconsole/DevConsoleProcessor.java new file mode 100644 index 00000000000000..036b0effd41a07 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devconsole/DevConsoleProcessor.java @@ -0,0 +1,15 @@ +package io.quarkus.kafka.streams.deployment.devconsole; + +import io.quarkus.deployment.IsDevelopment; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.devconsole.spi.DevConsoleRuntimeTemplateInfoBuildItem; +import io.quarkus.kafka.streams.runtime.TopologyDescriptionSupplier; + +public class DevConsoleProcessor { + + @BuildStep(onlyIf = IsDevelopment.class) + public DevConsoleRuntimeTemplateInfoBuildItem collectInfos() { + return new DevConsoleRuntimeTemplateInfoBuildItem("topology", new TopologyDescriptionSupplier()); + } + +} diff --git a/extensions/kafka-streams/deployment/src/main/resources/dev-templates/embedded.html b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/embedded.html new file mode 100644 index 00000000000000..68aca76e2bebe5 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/embedded.html @@ -0,0 +1,3 @@ + + + Topology diff --git a/extensions/kafka-streams/deployment/src/main/resources/dev-templates/kafka-streams-topology.html b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/kafka-streams-topology.html new file mode 100644 index 00000000000000..38de746e8606f1 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/kafka-streams-topology.html @@ -0,0 +1,178 @@ +{#include main fluid=true} +{#style} +textarea { + resize: none; + font-family: SFMono-Regular,Menlo,Monaco,Consolas,"Liberation Mono","Courier New",monospace; +} +{/style} +{#scriptref} + +{/scriptref} + +{#script} +function toMermaid(topology) { + var lines = topology.split('\n'); + var subTopologies = []; + var outside = []; + var currentGraphNodeName; + var topicList = []; + var storeList = []; + var name = (value) => value.replaceAll("-", "-
"); + + var graph = { + visit: function(lines) { + for(const line of lines) { + switch(true) { + case subTopology.pattern.test(line): + subTopology.visit(line); + break; + case source.pattern.test(line): + source.visit(line); + break; + case processor.pattern.test(line): + processor.visit(line); + break; + case sink.pattern.test(line): + sink.visit(line); + break; + case rightArrow.pattern.test(line): + rightArrow.visit(line); + break; + default: + break; + } + + } + + if(subTopologies.length) { + subTopologies.push(subTopology.endFormatter()); + } + + return ["graph TD "].concat(outside).concat(subTopologies).concat(topicList).concat(storeList).join('\n'); + } + } + var subTopology = { + pattern: /Sub-topology: ([0-9]*)/, + startFormatter: (subTopology) => `subgraph Sub-Topology: $\{subTopology}`, + endFormatter: () => `end`, + visit: function(line) { + var match = line.match(this.pattern); + // Close the previous sub-topology before opening a new one; + if(subTopologies.length) { + subTopologies.push(this.endFormatter()); + } + subTopologies.push(this.startFormatter(match[1])); + } + } + var source = { + pattern: /Source:\s+(\S+)\s+\(topics:\s+\[(.*)\]\)/, + formatter: (source, topic) => `$\{topic}[$\{topic}] --> $\{source}($\{name(source)})`, + visit: function(line) { + var match = line.match(this.pattern); + currentGraphNodeName = match[1].trim(); + var topics = match[2] + topics.split(',').filter(String).map(topic => topic.trim()).forEach(topic => { + outside.push(this.formatter(currentGraphNodeName, topic)); + topicList.push(topic); + }); + } + }; + + var processor = { + pattern: /Processor:\s+(\S+)\s+\(stores:\s+\[(.*)\]\)/, + formatter: (processor, store) => (processor.includes("JOIN")) ? `$\{store}[($\{name(store)})] --> $\{processor}($\{name(processor)})` : `$\{processor}($\{name(processor)}) --> $\{store}[($\{name(store)})]`, + visit: function(line) { + var match = line.match(this.pattern); + currentGraphNodeName = match[1].trim(); + var stores = match[2]; + stores.split(',').filter(String).map(store => store.trim()).forEach(store => { + outside.push(this.formatter(currentGraphNodeName, store)); + storeList.push(store); + }); + } + }; + + var sink = { + pattern: /Sink:\s+(\S+)\s+\(topic:\s+(.*)\)/, + formatter: (sink, topic) => `$\{sink}($\{name(sink)}) --> $\{topic}[$\{topic}]`, + visit: function(line) { + var match = line.match(this.pattern); + currentGraphNodeName = match[1].trim(); + var topic = match[2].trim(); + outside.push(this.formatter(currentGraphNodeName, topic)); + } + } + + var rightArrow = { + pattern: /\s*-->\s+(.*)/, + formatter: (src, dst) => `$\{src}($\{name(src)}) --> $\{dst}($\{name(dst)})`, + visit: function(line) { + var match = line.match(this.pattern); + match[1].split(',').filter(String).map(target => target.trim()).filter(target => target !== "none").forEach(target => { + subTopologies.push(this.formatter(currentGraphNodeName, target)) + }); + } + }; + + for(const line of lines) { + switch(true) { + case subTopology.pattern.test(line): + subTopology.visit(line); + break; + case source.pattern.test(line): + source.visit(line); + break; + case processor.pattern.test(line): + processor.visit(line); + break; + case sink.pattern.test(line): + sink.visit(line); + break; + case rightArrow.pattern.test(line): + rightArrow.visit(line); + break; + default: + break; + } + + } + + if(subTopologies.length) { + subTopologies.push(subTopology.endFormatter()); + } + + return ["graph TD"].concat(outside).concat(subTopologies).concat(topicList).concat(storeList).join('\n'); +} +mermaid.initialize(\{startOnLoad:false}); +$(function(){ + var topologyDescription = $('#topology-description').val(); + var mermaidGraphDefinition = toMermaid(topologyDescription); + console.log(mermaidGraphDefinition); + + mermaid.mermaidAPI.render("mermaid-graph-" + Date.now(), mermaidGraphDefinition, function(svgCode, bindFunctions){ + $('#graph').html(svgCode); + }); +}); +{/script} +{#title}Topology{/title} +{#body} +
+
+
+
+ + +
+
+
+
+
+
+
+
+{/body} +{/include} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/TopologyDescriptionSupplier.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/TopologyDescriptionSupplier.java new file mode 100644 index 00000000000000..a07353abf24c78 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/TopologyDescriptionSupplier.java @@ -0,0 +1,17 @@ +package io.quarkus.kafka.streams.runtime; + +import java.util.function.Supplier; + +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyDescription; + +import io.quarkus.arc.Arc; + +public class TopologyDescriptionSupplier implements Supplier { + + @Override + public TopologyDescription get() { + Topology topology = Arc.container().instance(Topology.class).get(); + return topology.describe(); + } +} diff --git a/extensions/vertx-http/deployment/pom.xml b/extensions/vertx-http/deployment/pom.xml index a50d33017bba9c..b703d62722454d 100644 --- a/extensions/vertx-http/deployment/pom.xml +++ b/extensions/vertx-http/deployment/pom.xml @@ -269,6 +269,19 @@ + + + org.webjars.npm + mermaid + ${webjar.mermaid.version} + jar + true + ${project.build.directory}/classes/dev-static/js/ + **/mermaid.min.js, **/mermaid.min.js.map + + + +