-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #20663 from danielpetisme/devservices-kafkastreams…
…-toplogy-viewer Introducing Kafka Streams DevUI
- Loading branch information
Showing
7 changed files
with
284 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15 changes: 15 additions & 0 deletions
15
...ent/src/main/java/io/quarkus/kafka/streams/deployment/devconsole/DevConsoleProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package io.quarkus.kafka.streams.deployment.devconsole; | ||
|
||
import io.quarkus.deployment.IsDevelopment; | ||
import io.quarkus.deployment.annotations.BuildStep; | ||
import io.quarkus.devconsole.spi.DevConsoleRuntimeTemplateInfoBuildItem; | ||
import io.quarkus.kafka.streams.runtime.TopologySupplier; | ||
|
||
public class DevConsoleProcessor { | ||
|
||
@BuildStep(onlyIf = IsDevelopment.class) | ||
public DevConsoleRuntimeTemplateInfoBuildItem collectInfos() { | ||
return new DevConsoleRuntimeTemplateInfoBuildItem("topology", new TopologySupplier()); | ||
} | ||
|
||
} |
3 changes: 3 additions & 0 deletions
3
extensions/kafka-streams/deployment/src/main/resources/dev-templates/embedded.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
<a href="{urlbase}/kafka-streams-topology" class="badge badge-light"> | ||
<i class="fa fa-project-diagram fa-fw"></i> | ||
Topology </a> |
221 changes: 221 additions & 0 deletions
221
...ons/kafka-streams/deployment/src/main/resources/dev-templates/kafka-streams-topology.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
{#include main fluid=true} | ||
{#style} | ||
#topology-description { | ||
resize: none; | ||
font-family: SFMono-Regular,Menlo,Monaco,Consolas,"Liberation Mono","Courier New",monospace; | ||
} | ||
{/style} | ||
{#scriptref} | ||
<script src="{devRootAppend}/resources/js/mermaid.min.js"></script> | ||
{/scriptref} | ||
|
||
{#script} | ||
function toMermaid(topology) { | ||
var lines = topology.split('\n'); | ||
var subTopologies = []; | ||
var outside = []; | ||
var currentGraphNodeName; | ||
var subTopologiesList = []; | ||
var topicSourcesList = []; | ||
var topicSinksList = []; | ||
var stateStoresList = []; | ||
var name = (value) => value.replaceAll("-", "-<br>"); | ||
|
||
var subTopology = { | ||
pattern: /Sub-topology: ([0-9]*)/, | ||
startFormatter: (subTopology) => `subgraph Sub-Topology: $\{subTopology}`, | ||
endFormatter: () => `end`, | ||
visit: function(line) { | ||
var match = line.match(this.pattern); | ||
// Close the previous sub-topology before opening a new one; | ||
if(subTopologies.length) { | ||
subTopologies.push(this.endFormatter()); | ||
} | ||
subTopologies.push(this.startFormatter(match[1])); | ||
subTopologiesList.push(match[1]); | ||
} | ||
} | ||
var source = { | ||
pattern: /Source:\s+(\S+)\s+\(topics:\s+\[(.*)\]\)/, | ||
formatter: (source, topic) => `$\{topic}[$\{topic}] --> $\{source}($\{name(source)})`, | ||
visit: function(line) { | ||
var match = line.match(this.pattern); | ||
currentGraphNodeName = match[1].trim(); | ||
var topics = match[2] | ||
topics.split(',').filter(String).map(topic => topic.trim()).forEach(topic => { | ||
outside.push(this.formatter(currentGraphNodeName, topic)); | ||
topicSourcesList.push(topic); | ||
}); | ||
} | ||
}; | ||
|
||
var processor = { | ||
pattern: /Processor:\s+(\S+)\s+\(stores:\s+\[(.*)\]\)/, | ||
formatter: (processor, store) => (processor.includes("JOIN")) ? `$\{store}[($\{name(store)})] --> $\{processor}($\{name(processor)})` : `$\{processor}($\{name(processor)}) --> $\{store}[($\{name(store)})]`, | ||
visit: function(line) { | ||
var match = line.match(this.pattern); | ||
currentGraphNodeName = match[1].trim(); | ||
var stores = match[2]; | ||
stores.split(',').filter(String).map(store => store.trim()).forEach(store => { | ||
outside.push(this.formatter(currentGraphNodeName, store)); | ||
stateStoresList.push(store); | ||
}); | ||
} | ||
}; | ||
|
||
var sink = { | ||
pattern: /Sink:\s+(\S+)\s+\(topic:\s+(.*)\)/, | ||
formatter: (sink, topic) => `$\{sink}($\{name(sink)}) --> $\{topic}[$\{topic}]`, | ||
visit: function(line) { | ||
var match = line.match(this.pattern); | ||
currentGraphNodeName = match[1].trim(); | ||
var topic = match[2].trim(); | ||
outside.push(this.formatter(currentGraphNodeName, topic)); | ||
topicSinksList.push(topic); | ||
} | ||
} | ||
|
||
var rightArrow = { | ||
pattern: /\s*-->\s+(.*)/, | ||
formatter: (src, dst) => `$\{src}($\{name(src)}) --> $\{dst}($\{name(dst)})`, | ||
visit: function(line) { | ||
var match = line.match(this.pattern); | ||
match[1].split(',').filter(String).map(target => target.trim()).filter(target => target !== "none").forEach(target => { | ||
subTopologies.push(this.formatter(currentGraphNodeName, target)) | ||
}); | ||
} | ||
}; | ||
|
||
for(const line of lines) { | ||
switch(true) { | ||
case subTopology.pattern.test(line): | ||
subTopology.visit(line); | ||
break; | ||
case source.pattern.test(line): | ||
source.visit(line); | ||
break; | ||
case processor.pattern.test(line): | ||
processor.visit(line); | ||
break; | ||
case sink.pattern.test(line): | ||
sink.visit(line); | ||
break; | ||
case rightArrow.pattern.test(line): | ||
rightArrow.visit(line); | ||
break; | ||
default: | ||
break; | ||
} | ||
|
||
} | ||
|
||
if(subTopologies.length) { | ||
subTopologies.push(subTopology.endFormatter()); | ||
} | ||
|
||
var description = ["graph TD"].concat(outside).concat(subTopologies).concat(topicSourcesList).concat(topicSinksList).concat(stateStoresList).join('\n'); | ||
|
||
return { | ||
description: description, | ||
details: { | ||
subTopologies: subTopologiesList, | ||
topicSources: topicSourcesList, | ||
topicSinks: topicSinksList, | ||
stateStores: stateStoresList | ||
} | ||
}; | ||
} | ||
mermaid.initialize(\{startOnLoad:false}); | ||
$(function(){ | ||
var topologyDescription = $('#topology-description').val(); | ||
var mermaidGraphDefinition = toMermaid(topologyDescription); | ||
console.log(mermaidGraphDefinition.description); | ||
|
||
mermaid.mermaidAPI.render("mermaid-graph-" + Date.now(), mermaidGraphDefinition.description, function(svgCode, bindFunctions){ | ||
$('#topology-graph').html(svgCode); | ||
}); | ||
|
||
$('#sub-topologies-details').html(mermaidGraphDefinition.details.subTopologies.length); | ||
$('#topic-sources-details').text(mermaidGraphDefinition.details.topicSources.length); | ||
$('#topic-sinks-details').text(mermaidGraphDefinition.details.topicSinks.length); | ||
$('#state-stores-details').text(mermaidGraphDefinition.details.stateStores.length); | ||
|
||
mermaidGraphDefinition.details.topicSources.sort().forEach(topic => { | ||
$('#topic-sources-list').append(`<li><span class="badge badge-pill badge-primary">$\{topic}</span></li>`) | ||
}); | ||
|
||
mermaidGraphDefinition.details.topicSinks.sort().forEach(topic => { | ||
$('#topic-sinks-list').append(`<li><span class="badge badge-pill badge-primary">$\{topic}</span></li>`) | ||
}); | ||
|
||
mermaidGraphDefinition.details.stateStores.sort().forEach(store => { | ||
$('#state-stores-list').append(`<li><span class="badge badge-pill badge-primary">$\{store}</span></li>`) | ||
}); | ||
|
||
}); | ||
{/script} | ||
{#title}Topology{/title} | ||
{#body} | ||
{#if info:topology} | ||
<div class="row"> | ||
<div class="col-5 offset-1"> | ||
<h3>Details</h3> | ||
</div> | ||
</div> | ||
<div class="row"> | ||
<div class="col-2 offset-1"> | ||
<h6>Sub-topologies: <span id="sub-topologies-details" class="badge badge-pill badge-secondary">0</span></h6> | ||
</div> | ||
<div class="col-2"> | ||
<h6>Topic sources: <span id="topic-sources-details" class="badge badge-pill badge-secondary">0</span></h6> | ||
</div> | ||
<div class="col-2"> | ||
<h6>Topic sinks: <span id="topic-sinks-details" class="badge badge-pill badge-secondary">0</span></h6> | ||
</div> | ||
<div class="col-2"> | ||
<h6>State stores: <span id="state-stores-details" class="badge badge-pill badge-secondary">0</span></h6> | ||
</div> | ||
</div> | ||
<div class="row"> | ||
<div class="col-2 offset-1"> | ||
</div> | ||
<div class="col-2"> | ||
<ul id="topic-sources-list" class="list-unstyled"> | ||
</ul> | ||
</div> | ||
<div class="col-2"> | ||
<ul id="topic-sinks-list" class="list-unstyled"> | ||
</ul> | ||
</div> | ||
<div class="col-2"> | ||
<ul id="state-stores-list" class="list-unstyled"> | ||
</ul> | ||
</div> | ||
</div> | ||
<div class="row"> | ||
<div class="col-5 offset-1"> | ||
<h3>Description</h3> | ||
</div> | ||
</div> | ||
<div class="row"> | ||
<div class="col-5 offset-1"> | ||
<form> | ||
<div class="form-group"> | ||
<textarea id="topology-description" readonly | ||
class="form-control-plaintext p-3 mb-5 bg-white r border rounded" | ||
rows='{info:topology.describe().toString().split("\r\n|\r|\n").length} + 1'> | ||
{info:topology.describe().toString()} | ||
</textarea> | ||
</div> | ||
</form> | ||
</div> | ||
<div class="col-5"> | ||
<div id="topology-graph"> | ||
</div> | ||
</div> | ||
</div> | ||
{#else} | ||
{#topologyNotFound /} | ||
{/if} | ||
{/body} | ||
{/include} |
16 changes: 16 additions & 0 deletions
16
...ions/kafka-streams/deployment/src/main/resources/dev-templates/tags/topologyNotFound.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
<div class="row justify-content-center"> | ||
<div class="col-6"> | ||
<div class="jumbotron"> | ||
<h1 class="display-4">No topology found.</h1> | ||
<p class="lead"> | ||
Verify you have an <code>@ApplicationScoped</code> bean which defines a CDI producer method with a <code>@Produces</code> annotation returning the Kafka Streams <code>Topology</code>. | ||
</p> | ||
<hr class="my-4"> | ||
<p class="lead"> | ||
<a class="btn btn-primary btn-lg" href="https://quarkus.io/guides/kafka-streams" role="button"> | ||
<i class="fa fa-book"></i> Quarkus Apache Kafka Streams guide | ||
</a> | ||
</p> | ||
</div> | ||
</div> | ||
</div> |
15 changes: 15 additions & 0 deletions
15
...afka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/TopologySupplier.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package io.quarkus.kafka.streams.runtime; | ||
|
||
import java.util.function.Supplier; | ||
|
||
import org.apache.kafka.streams.Topology; | ||
|
||
import io.quarkus.arc.Arc; | ||
|
||
public class TopologySupplier implements Supplier<Topology> { | ||
|
||
@Override | ||
public Topology get() { | ||
return Arc.container().instance(Topology.class).get(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters