Skip to content

Commit

Permalink
Introducing Kafka Streams DevUI
Browse files Browse the repository at this point in the history
  • Loading branch information
danielpetisme committed Oct 11, 2021
1 parent 81d5fb7 commit 912d14a
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 1 deletion.
1 change: 1 addition & 0 deletions build-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
<webjar.font-awesome.version>5.15.2</webjar.font-awesome.version>
<webjar.jquery.version>3.5.1</webjar.jquery.version>
<webjar.codemirror.version>5.62.0</webjar.codemirror.version>
<webjar.mermaid.version>8.9.1</webjar.mermaid.version>

<!-- revapi API check -->
<revapi-maven-plugin.version>0.12.1</revapi-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.RuntimeReinitializedClassBuildItem;
import io.quarkus.deployment.pkg.NativeConfig;
import io.quarkus.kafka.streams.runtime.*;
import io.quarkus.kafka.streams.runtime.KafkaStreamsProducer;
import io.quarkus.kafka.streams.runtime.KafkaStreamsRecorder;
import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig;
Expand All @@ -53,7 +54,7 @@ void build(BuildProducer<FeatureBuildItem> feature,

registerClassesThatAreLoadedThroughReflection(reflectiveClasses, launchMode);
registerClassesThatAreAccessedViaJni(jniRuntimeAccessibleClasses);
addSupportForRocksDbLib(nativeLibs, config);
// addSupportForRocksDbLib(nativeLibs, config);
enableLoadOfNativeLibs(reinitialized);
}

Expand Down Expand Up @@ -184,4 +185,5 @@ void addHealthChecks(KafkaStreamsBuildTimeConfig buildTimeConfig, BuildProducer<
"io.quarkus.kafka.streams.runtime.health.KafkaStreamsStateHealthCheck",
buildTimeConfig.healthEnabled));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.kafka.streams.deployment.devconsole;

import io.quarkus.deployment.IsDevelopment;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.devconsole.spi.DevConsoleRuntimeTemplateInfoBuildItem;
import io.quarkus.kafka.streams.runtime.TopologyDescriptionSupplier;

public class DevConsoleProcessor {

@BuildStep(onlyIf = IsDevelopment.class)
public DevConsoleRuntimeTemplateInfoBuildItem collectInfos() {
return new DevConsoleRuntimeTemplateInfoBuildItem("topology", new TopologyDescriptionSupplier());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<a href="{urlbase}/kafka-streams-topology" class="badge badge-light">
<i class="fa fa-project-diagram fa-fw"></i>
Topology </a>
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
{#include main fluid=true}
{#style}
textarea {
resize: none;
font-family: SFMono-Regular,Menlo,Monaco,Consolas,"Liberation Mono","Courier New",monospace;
}
{/style}
{#scriptref}
<script src="{devRootAppend}/resources/js/mermaid.min.js"></script>
{/scriptref}

{#script}
function toMermaid(topology) {
var lines = topology.split('\n');
var subTopologies = [];
var outside = [];
var currentGraphNodeName;
var topicList = [];
var storeList = [];
var name = (value) => value.replaceAll("-", "-<br>");

var graph = {
visit: function(lines) {
for(const line of lines) {
switch(true) {
case subTopology.pattern.test(line):
subTopology.visit(line);
break;
case source.pattern.test(line):
source.visit(line);
break;
case processor.pattern.test(line):
processor.visit(line);
break;
case sink.pattern.test(line):
sink.visit(line);
break;
case rightArrow.pattern.test(line):
rightArrow.visit(line);
break;
default:
break;
}

}

if(subTopologies.length) {
subTopologies.push(subTopology.endFormatter());
}

return ["graph TD "].concat(outside).concat(subTopologies).concat(topicList).concat(storeList).join('\n');
}
}
var subTopology = {
pattern: /Sub-topology: ([0-9]*)/,
startFormatter: (subTopology) => `subgraph Sub-Topology: $\{subTopology}`,
endFormatter: () => `end`,
visit: function(line) {
var match = line.match(this.pattern);
// Close the previous sub-topology before opening a new one;
if(subTopologies.length) {
subTopologies.push(this.endFormatter());
}
subTopologies.push(this.startFormatter(match[1]));
}
}
var source = {
pattern: /Source:\s+(\S+)\s+\(topics:\s+\[(.*)\]\)/,
formatter: (source, topic) => `$\{topic}[$\{topic}] --> $\{source}($\{name(source)})`,
visit: function(line) {
var match = line.match(this.pattern);
currentGraphNodeName = match[1].trim();
var topics = match[2]
topics.split(',').filter(String).map(topic => topic.trim()).forEach(topic => {
outside.push(this.formatter(currentGraphNodeName, topic));
topicList.push(topic);
});
}
};

var processor = {
pattern: /Processor:\s+(\S+)\s+\(stores:\s+\[(.*)\]\)/,
formatter: (processor, store) => (processor.includes("JOIN")) ? `$\{store}[($\{name(store)})] --> $\{processor}($\{name(processor)})` : `$\{processor}($\{name(processor)}) --> $\{store}[($\{name(store)})]`,
visit: function(line) {
var match = line.match(this.pattern);
currentGraphNodeName = match[1].trim();
var stores = match[2];
stores.split(',').filter(String).map(store => store.trim()).forEach(store => {
outside.push(this.formatter(currentGraphNodeName, store));
storeList.push(store);
});
}
};

var sink = {
pattern: /Sink:\s+(\S+)\s+\(topic:\s+(.*)\)/,
formatter: (sink, topic) => `$\{sink}($\{name(sink)}) --> $\{topic}[$\{topic}]`,
visit: function(line) {
var match = line.match(this.pattern);
currentGraphNodeName = match[1].trim();
var topic = match[2].trim();
outside.push(this.formatter(currentGraphNodeName, topic));
}
}

var rightArrow = {
pattern: /\s*-->\s+(.*)/,
formatter: (src, dst) => `$\{src}($\{name(src)}) --> $\{dst}($\{name(dst)})`,
visit: function(line) {
var match = line.match(this.pattern);
match[1].split(',').filter(String).map(target => target.trim()).filter(target => target !== "none").forEach(target => {
subTopologies.push(this.formatter(currentGraphNodeName, target))
});
}
};

for(const line of lines) {
switch(true) {
case subTopology.pattern.test(line):
subTopology.visit(line);
break;
case source.pattern.test(line):
source.visit(line);
break;
case processor.pattern.test(line):
processor.visit(line);
break;
case sink.pattern.test(line):
sink.visit(line);
break;
case rightArrow.pattern.test(line):
rightArrow.visit(line);
break;
default:
break;
}

}

if(subTopologies.length) {
subTopologies.push(subTopology.endFormatter());
}

return ["graph TD"].concat(outside).concat(subTopologies).concat(topicList).concat(storeList).join('\n');
}
mermaid.initialize(\{startOnLoad:false});
$(function(){
var topologyDescription = $('#topology-description').val();
var mermaidGraphDefinition = toMermaid(topologyDescription);
console.log(mermaidGraphDefinition);

mermaid.mermaidAPI.render("mermaid-graph-" + Date.now(), mermaidGraphDefinition, function(svgCode, bindFunctions){
$('#graph').html(svgCode);
});
});
{/script}
{#title}Topology{/title}
{#body}
<div class="row">
<div class="col-5 offset-1">
<form>
<div class="form-group">
<label for="topology-description" class="col-form-label h1">Kafka Streams Topology</label>
<textarea id="topology-description" readonly
class="form-control-plaintext p-3 mb-5 bg-white r border rounded"
rows='{info:topology.toString().split("\r\n|\r|\n").length} + 1'>
{info:topology.toString()}
</textarea>
</div>
</form>
</div>
<div class="col-5">
<div id="graph">
</div>
</div>
</div>
{/body}
{/include}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkus.kafka.streams.runtime;

import java.util.function.Supplier;

import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;

import io.quarkus.arc.Arc;

public class TopologyDescriptionSupplier implements Supplier<TopologyDescription> {

@Override
public TopologyDescription get() {
Topology topology = Arc.container().instance(Topology.class).get();
return topology.describe();
}
}
13 changes: 13 additions & 0 deletions extensions/vertx-http/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,19 @@
<org.codehaus.plexus.components.io.filemappers.FlattenFileMapper/>
</fileMappers>
</artifactItem>
<!-- Mermaid -->
<artifactItem>
<groupId>org.webjars.npm</groupId>
<artifactId>mermaid</artifactId>
<version>${webjar.mermaid.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes/dev-static/js/</outputDirectory>
<includes>**/mermaid.min.js, **/mermaid.min.js.map</includes>
<fileMappers>
<org.codehaus.plexus.components.io.filemappers.FlattenFileMapper/>
</fileMappers>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down

0 comments on commit 912d14a

Please sign in to comment.