diff --git a/build-parent/pom.xml b/build-parent/pom.xml index 276567c91cab2..19ce068d52829 100644 --- a/build-parent/pom.xml +++ b/build-parent/pom.xml @@ -118,6 +118,7 @@ 5.15.2 3.5.1 5.62.0 + 8.9.1 0.12.1 diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devconsole/DevConsoleProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devconsole/DevConsoleProcessor.java new file mode 100644 index 0000000000000..ffce789d55735 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/devconsole/DevConsoleProcessor.java @@ -0,0 +1,15 @@ +package io.quarkus.kafka.streams.deployment.devconsole; + +import io.quarkus.deployment.IsDevelopment; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.devconsole.spi.DevConsoleRuntimeTemplateInfoBuildItem; +import io.quarkus.kafka.streams.runtime.TopologySupplier; + +public class DevConsoleProcessor { + + @BuildStep(onlyIf = IsDevelopment.class) + public DevConsoleRuntimeTemplateInfoBuildItem collectInfos() { + return new DevConsoleRuntimeTemplateInfoBuildItem("topology", new TopologySupplier()); + } + +} diff --git a/extensions/kafka-streams/deployment/src/main/resources/dev-templates/embedded.html b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/embedded.html new file mode 100644 index 0000000000000..68aca76e2bebe --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/embedded.html @@ -0,0 +1,3 @@ + + + Topology diff --git a/extensions/kafka-streams/deployment/src/main/resources/dev-templates/kafka-streams-topology.html b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/kafka-streams-topology.html new file mode 100644 index 0000000000000..4165d8f12e2f7 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/kafka-streams-topology.html @@ -0,0 +1,221 @@ +{#include main fluid=true} +{#style} +#topology-description { + resize: none; + font-family: SFMono-Regular,Menlo,Monaco,Consolas,"Liberation Mono","Courier New",monospace; +} +{/style} +{#scriptref} + +{/scriptref} + +{#script} +function toMermaid(topology) { + var lines = topology.split('\n'); + var subTopologies = []; + var outside = []; + var currentGraphNodeName; + var subTopologiesList = []; + var topicSourcesList = []; + var topicSinksList = []; + var stateStoresList = []; + var name = (value) => value.replaceAll("-", "-
"); + + var subTopology = { + pattern: /Sub-topology: ([0-9]*)/, + startFormatter: (subTopology) => `subgraph Sub-Topology: $\{subTopology}`, + endFormatter: () => `end`, + visit: function(line) { + var match = line.match(this.pattern); + // Close the previous sub-topology before opening a new one; + if(subTopologies.length) { + subTopologies.push(this.endFormatter()); + } + subTopologies.push(this.startFormatter(match[1])); + subTopologiesList.push(match[1]); + } + } + var source = { + pattern: /Source:\s+(\S+)\s+\(topics:\s+\[(.*)\]\)/, + formatter: (source, topic) => `$\{topic}[$\{topic}] --> $\{source}($\{name(source)})`, + visit: function(line) { + var match = line.match(this.pattern); + currentGraphNodeName = match[1].trim(); + var topics = match[2] + topics.split(',').filter(String).map(topic => topic.trim()).forEach(topic => { + outside.push(this.formatter(currentGraphNodeName, topic)); + topicSourcesList.push(topic); + }); + } + }; + + var processor = { + pattern: /Processor:\s+(\S+)\s+\(stores:\s+\[(.*)\]\)/, + formatter: (processor, store) => (processor.includes("JOIN")) ? `$\{store}[($\{name(store)})] --> $\{processor}($\{name(processor)})` : `$\{processor}($\{name(processor)}) --> $\{store}[($\{name(store)})]`, + visit: function(line) { + var match = line.match(this.pattern); + currentGraphNodeName = match[1].trim(); + var stores = match[2]; + stores.split(',').filter(String).map(store => store.trim()).forEach(store => { + outside.push(this.formatter(currentGraphNodeName, store)); + stateStoresList.push(store); + }); + } + }; + + var sink = { + pattern: /Sink:\s+(\S+)\s+\(topic:\s+(.*)\)/, + formatter: (sink, topic) => `$\{sink}($\{name(sink)}) --> $\{topic}[$\{topic}]`, + visit: function(line) { + var match = line.match(this.pattern); + currentGraphNodeName = match[1].trim(); + var topic = match[2].trim(); + outside.push(this.formatter(currentGraphNodeName, topic)); + topicSinksList.push(topic); + } + } + + var rightArrow = { + pattern: /\s*-->\s+(.*)/, + formatter: (src, dst) => `$\{src}($\{name(src)}) --> $\{dst}($\{name(dst)})`, + visit: function(line) { + var match = line.match(this.pattern); + match[1].split(',').filter(String).map(target => target.trim()).filter(target => target !== "none").forEach(target => { + subTopologies.push(this.formatter(currentGraphNodeName, target)) + }); + } + }; + + for(const line of lines) { + switch(true) { + case subTopology.pattern.test(line): + subTopology.visit(line); + break; + case source.pattern.test(line): + source.visit(line); + break; + case processor.pattern.test(line): + processor.visit(line); + break; + case sink.pattern.test(line): + sink.visit(line); + break; + case rightArrow.pattern.test(line): + rightArrow.visit(line); + break; + default: + break; + } + + } + + if(subTopologies.length) { + subTopologies.push(subTopology.endFormatter()); + } + + var description = ["graph TD"].concat(outside).concat(subTopologies).concat(topicSourcesList).concat(topicSinksList).concat(stateStoresList).join('\n'); + + return { + description: description, + details: { + subTopologies: subTopologiesList, + topicSources: topicSourcesList, + topicSinks: topicSinksList, + stateStores: stateStoresList + } + }; +} +mermaid.initialize(\{startOnLoad:false}); +$(function(){ + var topologyDescription = $('#topology-description').val(); + var mermaidGraphDefinition = toMermaid(topologyDescription); + console.log(mermaidGraphDefinition.description); + + mermaid.mermaidAPI.render("mermaid-graph-" + Date.now(), mermaidGraphDefinition.description, function(svgCode, bindFunctions){ + $('#topology-graph').html(svgCode); + }); + + $('#sub-topologies-details').html(mermaidGraphDefinition.details.subTopologies.length); + $('#topic-sources-details').text(mermaidGraphDefinition.details.topicSources.length); + $('#topic-sinks-details').text(mermaidGraphDefinition.details.topicSinks.length); + $('#state-stores-details').text(mermaidGraphDefinition.details.stateStores.length); + + mermaidGraphDefinition.details.topicSources.sort().forEach(topic => { + $('#topic-sources-list').append(`
  • $\{topic}
  • `) + }); + + mermaidGraphDefinition.details.topicSinks.sort().forEach(topic => { + $('#topic-sinks-list').append(`
  • $\{topic}
  • `) + }); + + mermaidGraphDefinition.details.stateStores.sort().forEach(store => { + $('#state-stores-list').append(`
  • $\{store}
  • `) + }); + +}); +{/script} +{#title}Topology{/title} +{#body} +{#if info:topology} +
    +
    +

    Details

    +
    +
    +
    +
    +
    Sub-topologies: 0
    +
    +
    +
    Topic sources: 0
    +
    +
    +
    Topic sinks: 0
    +
    +
    +
    State stores: 0
    +
    +
    +
    +
    +
    +
    + +
    +
    + +
    +
    + +
    +
    +
    +
    +

    Description

    +
    +
    +
    +
    +
    +
    + +
    +
    +
    +
    +
    +
    +
    +
    +{#else} +{#topologyNotFound /} +{/if} +{/body} +{/include} diff --git a/extensions/kafka-streams/deployment/src/main/resources/dev-templates/tags/topologyNotFound.html b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/tags/topologyNotFound.html new file mode 100644 index 0000000000000..7caa3fb578494 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/resources/dev-templates/tags/topologyNotFound.html @@ -0,0 +1,16 @@ +
    +
    +
    +

    No topology found.

    +

    + Verify you have an @ApplicationScoped bean which defines a CDI producer method with a @Produces annotation returning the Kafka Streams Topology. +

    +
    +

    + + Quarkus Apache Kafka Streams guide + +

    +
    +
    +
    diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/TopologySupplier.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/TopologySupplier.java new file mode 100644 index 0000000000000..1bb1eb4e3f0ba --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/TopologySupplier.java @@ -0,0 +1,15 @@ +package io.quarkus.kafka.streams.runtime; + +import java.util.function.Supplier; + +import org.apache.kafka.streams.Topology; + +import io.quarkus.arc.Arc; + +public class TopologySupplier implements Supplier { + + @Override + public Topology get() { + return Arc.container().instance(Topology.class).get(); + } +} diff --git a/extensions/vertx-http/deployment/pom.xml b/extensions/vertx-http/deployment/pom.xml index a50d33017bba9..b703d62722454 100644 --- a/extensions/vertx-http/deployment/pom.xml +++ b/extensions/vertx-http/deployment/pom.xml @@ -269,6 +269,19 @@ + + + org.webjars.npm + mermaid + ${webjar.mermaid.version} + jar + true + ${project.build.directory}/classes/dev-static/js/ + **/mermaid.min.js, **/mermaid.min.js.map + + + +