From f647a5b86ace2a09f0f30a872ab90f3faddc14c9 Mon Sep 17 00:00:00 2001 From: David Cotton Date: Sat, 4 Nov 2023 17:28:30 +0100 Subject: [PATCH] Adds Pattern support to Kafka Streams Topology Dev UI --- .../devui/KafkaStreamsJsonRPCService.java | 8 +- .../runtime/devui/TopologyParserContext.java | 32 +++-- .../devui/KafkaStreamsJsonRPCServiceTest.java | 112 ++++++++++-------- 3 files changed, 94 insertions(+), 58 deletions(-) diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCService.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCService.java index fc82604f41658..24f25e424e5c2 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCService.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCService.java @@ -66,7 +66,7 @@ public void accept(TopologyParserContext context) { private static final RawTopologyItemParser SOURCE = new RawTopologyItemParser() { private final Pattern sourcePattern = Pattern - .compile("Source:\\s+(?\\S+)\\s+\\(topics:\\s+\\[(?.*)\\]\\).*"); + .compile("Source:\\s+(?\\S+)\\s+\\(topics:\\s+((\\[(?.*)\\])|(?.*)\\)).*"); private Matcher matcher; @Override @@ -77,7 +77,11 @@ public boolean test(String line) { @Override public void accept(TopologyParserContext context) { - context.addSources(matcher.group("source"), matcher.group("topics").split(",")); + if (matcher.group("topics") != null) { + context.addSources(matcher.group("source"), matcher.group("topics").split(",")); + } else if (matcher.group("regex") != null) { + context.addRegexSource(matcher.group("source"), matcher.group("regex")); + } } }; diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java index cdbf1ec7c1a51..6a047065af724 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/devui/TopologyParserContext.java @@ -41,6 +41,16 @@ void addSources(String source, String[] topics) { }); } + void addRegexSource(String source, String regex) { + currentNode = source; + final var cleanRegex = regex.trim(); + if (!cleanRegex.isEmpty()) { + sources.add(cleanRegex); + graphviz.addRegexSource(source, cleanRegex); + mermaid.addRegexSource(source, cleanRegex); + } + } + void addStores(String[] stores, String processor, boolean join) { currentNode = processor; Arrays.stream(stores) @@ -71,8 +81,8 @@ String toGraph() { final var res = new ArrayList(); res.add("digraph {"); - res.add(" fontname=\"Helvetica\"; fontsize=\"10\";"); - res.add(" node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=\"Helvetica\" fontsize=\"10\"];"); + res.add(" fontname=Helvetica; fontsize=10;"); + res.add(" node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=Helvetica fontsize=10];"); nodes.forEach(n -> res.add(' ' + n + ';')); subGraphs.entrySet().forEach(e -> { res.add(" subgraph cluster" + e.getKey() + " {"); @@ -80,11 +90,6 @@ String toGraph() { e.getValue().forEach(v -> res.add(" " + v + ';')); res.add(" }"); }); - for (int i = 0; i < subGraphs.size(); i++) { - res.add(" subgraph cluster" + i + " {"); - res.add(" label=\"Sub-Topology: " + i + "\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";"); - res.add(" }"); - } edges.forEach(e -> res.add(' ' + e + ';')); res.add("}"); @@ -108,6 +113,15 @@ private void addSource(String source, String topic) { subGraphs.get(currentGraph).add(toId(source)); } + private void addRegexSource(String source, String regex) { + final var regexId = "REGEX_" + nodes.size(); + final var regexLabel = regex.replaceAll("\\\\", "\\\\\\\\"); + nodes.add(regexId + " [label=\"" + regexLabel + "\" shape=invhouse style=dashed margin=\"0,0\"]"); + nodes.add(toId(source) + " [label=\"" + toLabel(source) + "\"]"); + edges.add(regexId + " -> " + toId(source)); + subGraphs.get(currentGraph).add(toId(source)); + } + private void addTarget(String target, String node) { nodes.add(toId(target) + " [label=\"" + toLabel(target) + "\"]"); edges.add(toId(node) + " -> " + toId(target)); @@ -164,6 +178,10 @@ private void addSource(String source, String topic) { endpoints.add(topic + '[' + topic + "] --> " + source + '(' + toName(source) + ')'); } + private void addRegexSource(String source, String regex) { + endpoints.add("REGEX_" + endpoints.size() + '[' + regex + "] --> " + source + '(' + toName(source) + ')'); + } + private void addTarget(String target, String node) { subTopologies.add(' ' + node + '[' + toName(node) + "] --> " + target + '(' + toName(target) + ')'); } diff --git a/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java b/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java index 8c40491616d87..854ac92d97251 100644 --- a/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java +++ b/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/devui/KafkaStreamsJsonRPCServiceTest.java @@ -31,66 +31,77 @@ public void shouldParsingStayConstant() { + " --> KSTREAM-SINK-0000000007\n" + " <-- KSTREAM-AGGREGATE-0000000005\n" + " Sink: KSTREAM-SINK-0000000007 (topic: temperatures-aggregated)\n" - + " <-- KTABLE-TOSTREAM-0000000006"; + + " <-- KTABLE-TOSTREAM-0000000006\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000008 (topics: notification\\..+)\n" + + " --> KSTREAM-FOREACH-0000000009\n" + + " Processor: KSTREAM-FOREACH-0000000009 (stores: [])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000008"; final var actual = rpcService.parseTopologyDescription(expectedDescribe); assertEquals(expectedDescribe, actual.getString("describe")); - assertEquals("[0, 1]", actual.getString("subTopologies")); - assertEquals("[temperature-values, weather-stations]", actual.getString("sources")); + assertEquals("[0, 1, 2]", actual.getString("subTopologies")); + assertEquals("[notification\\..+, temperature-values, weather-stations]", actual.getString("sources")); assertEquals("[temperatures-aggregated]", actual.getString("sinks")); assertEquals("[weather-stations-STATE-STORE-0000000000, weather-stations-store]", actual.getString("stores")); - assertEquals("digraph {\n" + - " fontname=\"Helvetica\"; fontsize=\"10\";\n" + - " node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=\"Helvetica\" fontsize=\"10\"];\n" + - " weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n" + - " KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n" + - " KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n" + - " weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n" - + - " temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n" + - " KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n" + - " KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n" + - " KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n" + - " weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n" + - " KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n" + - " KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n" + - " temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n" + - " subgraph cluster0 {\n" + - " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + - " KSTREAM_SOURCE_0000000001;\n" + - " KTABLE_SOURCE_0000000002;\n" + - " }\n" + - " subgraph cluster1 {\n" + - " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + - " KSTREAM_SOURCE_0000000003;\n" + - " KSTREAM_LEFTJOIN_0000000004;\n" + - " KSTREAM_AGGREGATE_0000000005;\n" + - " KTABLE_TOSTREAM_0000000006;\n" + - " KSTREAM_SINK_0000000007;\n" + - " }\n" + - " subgraph cluster0 {\n" + - " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + - " }\n" + - " subgraph cluster1 {\n" + - " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + - " }\n" + - " weather_stations -> KSTREAM_SOURCE_0000000001;\n" + - " KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n" + - " KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n" + - " temperature_values -> KSTREAM_SOURCE_0000000003;\n" + - " KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n" + - " KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n" + - " KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n" + - " KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n" + - " KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n" + - " KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n" + - "}", actual.getString("graphviz")); + assertEquals("digraph {\n" + + " fontname=Helvetica; fontsize=10;\n" + + " node [style=filled fillcolor=white color=\"#C9B7DD\" shape=box fontname=Helvetica fontsize=10];\n" + + " weather_stations [label=\"weather\\nstations\" shape=invhouse margin=\"0,0\"];\n" + + " KSTREAM_SOURCE_0000000001 [label=\"KSTREAM\\nSOURCE\\n0000000001\"];\n" + + " KTABLE_SOURCE_0000000002 [label=\"KTABLE\\nSOURCE\\n0000000002\"];\n" + + " weather_stations_STATE_STORE_0000000000 [label=\"weather\\nstations\\nSTATE\\nSTORE\\n0000000000\" shape=cylinder];\n" + + " temperature_values [label=\"temperature\\nvalues\" shape=invhouse margin=\"0,0\"];\n" + + " KSTREAM_SOURCE_0000000003 [label=\"KSTREAM\\nSOURCE\\n0000000003\"];\n" + + " KSTREAM_LEFTJOIN_0000000004 [label=\"KSTREAM\\nLEFTJOIN\\n0000000004\"];\n" + + " KSTREAM_AGGREGATE_0000000005 [label=\"KSTREAM\\nAGGREGATE\\n0000000005\"];\n" + + " weather_stations_store [label=\"weather\\nstations\\nstore\" shape=cylinder];\n" + + " KTABLE_TOSTREAM_0000000006 [label=\"KTABLE\\nTOSTREAM\\n0000000006\"];\n" + + " KSTREAM_SINK_0000000007 [label=\"KSTREAM\\nSINK\\n0000000007\"];\n" + + " temperatures_aggregated [label=\"temperatures\\naggregated\" shape=house margin=\"0,0\"];\n" + + " REGEX_12 [label=\"notification\\\\..+\" shape=invhouse style=dashed margin=\"0,0\"];\n" + + " KSTREAM_SOURCE_0000000008 [label=\"KSTREAM\\nSOURCE\\n0000000008\"];\n" + + " KSTREAM_FOREACH_0000000009 [label=\"KSTREAM\\nFOREACH\\n0000000009\"];\n" + + " subgraph cluster0 {\n" + + " label=\"Sub-Topology: 0\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + + " KSTREAM_SOURCE_0000000001;\n" + + " KTABLE_SOURCE_0000000002;\n" + + " }\n" + + " subgraph cluster1 {\n" + + " label=\"Sub-Topology: 1\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + + " KSTREAM_SOURCE_0000000003;\n" + + " KSTREAM_LEFTJOIN_0000000004;\n" + + " KSTREAM_AGGREGATE_0000000005;\n" + + " KTABLE_TOSTREAM_0000000006;\n" + + " KSTREAM_SINK_0000000007;\n" + + " }\n" + + " subgraph cluster2 {\n" + + " label=\"Sub-Topology: 2\"; color=\"#C8C879\"; bgcolor=\"#FFFFDE\";\n" + + " KSTREAM_SOURCE_0000000008;\n" + + " KSTREAM_FOREACH_0000000009;\n" + + " }\n" + + " weather_stations -> KSTREAM_SOURCE_0000000001;\n" + + " KSTREAM_SOURCE_0000000001 -> KTABLE_SOURCE_0000000002;\n" + + " KTABLE_SOURCE_0000000002 -> weather_stations_STATE_STORE_0000000000;\n" + + " temperature_values -> KSTREAM_SOURCE_0000000003;\n" + + " KSTREAM_SOURCE_0000000003 -> KSTREAM_LEFTJOIN_0000000004;\n" + + " KSTREAM_LEFTJOIN_0000000004 -> KSTREAM_AGGREGATE_0000000005;\n" + + " KSTREAM_AGGREGATE_0000000005 -> weather_stations_store;\n" + + " KSTREAM_AGGREGATE_0000000005 -> KTABLE_TOSTREAM_0000000006;\n" + + " KTABLE_TOSTREAM_0000000006 -> KSTREAM_SINK_0000000007;\n" + + " KSTREAM_SINK_0000000007 -> temperatures_aggregated;\n" + + " REGEX_12 -> KSTREAM_SOURCE_0000000008;\n" + + " KSTREAM_SOURCE_0000000008 -> KSTREAM_FOREACH_0000000009;\n" + + "}", actual.getString("graphviz")); assertEquals("graph TD\n" + " weather-stations[weather-stations] --> KSTREAM-SOURCE-0000000001(KSTREAM-
SOURCE-
0000000001)\n" + " KTABLE-SOURCE-0000000002[KTABLE-
SOURCE-
0000000002] --> weather-stations-STATE-STORE-0000000000(weather-
stations-
STATE-
STORE-
0000000000)\n" + " temperature-values[temperature-values] --> KSTREAM-SOURCE-0000000003(KSTREAM-
SOURCE-
0000000003)\n" + " KSTREAM-AGGREGATE-0000000005[KSTREAM-
AGGREGATE-
0000000005] --> weather-stations-store(weather-
stations-
store)\n" + " KSTREAM-SINK-0000000007[KSTREAM-
SINK-
0000000007] --> temperatures-aggregated(temperatures-aggregated)\n" + + " REGEX_5[notification\\..+] --> KSTREAM-SOURCE-0000000008(KSTREAM-
SOURCE-
0000000008)\n" + " subgraph Sub-Topology: 0\n" + " KSTREAM-SOURCE-0000000001[KSTREAM-
SOURCE-
0000000001] --> KTABLE-SOURCE-0000000002(KTABLE-
SOURCE-
0000000002)\n" + " end\n" @@ -99,6 +110,9 @@ public void shouldParsingStayConstant() { + " KSTREAM-LEFTJOIN-0000000004[KSTREAM-
LEFTJOIN-
0000000004] --> KSTREAM-AGGREGATE-0000000005(KSTREAM-
AGGREGATE-
0000000005)\n" + " KSTREAM-AGGREGATE-0000000005[KSTREAM-
AGGREGATE-
0000000005] --> KTABLE-TOSTREAM-0000000006(KTABLE-
TOSTREAM-
0000000006)\n" + " KTABLE-TOSTREAM-0000000006[KTABLE-
TOSTREAM-
0000000006] --> KSTREAM-SINK-0000000007(KSTREAM-
SINK-
0000000007)\n" + + " end\n" + + " subgraph Sub-Topology: 2\n" + + " KSTREAM-SOURCE-0000000008[KSTREAM-
SOURCE-
0000000008] --> KSTREAM-FOREACH-0000000009(KSTREAM-
FOREACH-
0000000009)\n" + " end", actual.getString("mermaid")); } }