From 0109a3f7189d332c333cad0625b54779bf8f9cd4 Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Wed, 5 Jun 2024 14:09:37 -0600 Subject: [PATCH] KAFKA-15045: (KIP-924 pt. 17) State store computation fixed (#16194) Fixed the calculation of the store name list based on the subtopology being accessed. Also added a new test to make sure this new functionality works as intended. Reviewers: Anna Sophie Blee-Goldman --- checkstyle/suppressions.xml | 2 +- .../internals/InternalTopologyBuilder.java | 41 +++++++++++++++++-- .../internals/StreamsPartitionAssignor.java | 3 +- .../processor/internals/TopologyMetadata.java | 4 ++ .../InternalTopologyBuilderTest.java | 24 +++++++++++ 5 files changed, 67 insertions(+), 7 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index fc6995dadfe7e..46047a6c0ff76 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -200,7 +200,7 @@ files="StreamThread.java"/> + files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl).java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index c160655578ee6..b387760043181 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -144,8 +144,12 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { private String applicationId = null; + // keyed by subtopology id private Map> nodeGroups = null; + // keyed by subtopology id + private Map> subtopologyIdToStateStoreNames = null; + // The name of the topology this builder belongs to, or null if this is not a NamedTopology private final String topologyName; // TODO KAFKA-13336: we can remove this reference once we make the Topology/NamedTopology class into an interface and implement it @@ -937,14 +941,15 @@ private int putNodeGroupName(final String nodeName, * @return the full topology minus any global state */ public synchronized ProcessorTopology buildTopology() { - final Set nodeGroup = new HashSet<>(); + final Set allNodes = new HashSet<>(); for (final Set value : nodeGroups().values()) { - nodeGroup.addAll(value); + allNodes.addAll(value); } - nodeGroup.removeAll(globalNodeGroups()); + allNodes.removeAll(globalNodeGroups()); initializeSubscription(); - return build(nodeGroup); + initializeSubtopologyIdToStateStoreNamesMap(); + return build(allNodes); } /** @@ -1500,6 +1505,34 @@ private boolean isGlobalSource(final String nodeName) { return false; } + public Set stateStoreNamesForSubtopology(final int subtopologyId) { + return subtopologyIdToStateStoreNames.get(subtopologyId); + } + + private void initializeSubtopologyIdToStateStoreNamesMap() { + final Map> storeNames = new HashMap<>(); + + for (final Map.Entry> nodeGroup : makeNodeGroups().entrySet()) { + final Set subtopologyNodes = nodeGroup.getValue(); + final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(subtopologyNodes); + + if (!isNodeGroupOfGlobalStores) { + final int subtopologyId = nodeGroup.getKey(); + final Set subtopologyStoreNames = new HashSet<>(); + + for (final String nodeName : subtopologyNodes) { + final AbstractNode node = nodeFactories.get(nodeName).describe(); + if (node instanceof Processor) { + subtopologyStoreNames.addAll(((Processor) node).stores()); + } + } + + storeNames.put(subtopologyId, subtopologyStoreNames); + } + } + subtopologyIdToStateStoreNames = storeNames; + } + public TopologyDescription describe() { final TopologyDescription description = new TopologyDescription(topologyName); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 298e58888bd20..53354b3844db1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -570,8 +570,7 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe Function.identity(), taskId -> { final Set stateStoreNames = topologyMetadata - .stateStoreNameToSourceTopicsForTopology(taskId.topologyName()) - .keySet(); + .stateStoreNamesForSubtopology(taskId.topologyName(), taskId.subtopology()); final Set topicPartitions = topicPartitionsForTask.get(taskId); return new DefaultTaskInfo( taskId, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index 150a6be9c5aad..5a0e3407eba88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -521,6 +521,10 @@ public Map> stateStoreNameToSourceTopicsForTopology(final S return lookupBuilderForNamedTopology(topologyName).stateStoreNameToFullSourceTopicNames(); } + public Set stateStoreNamesForSubtopology(final String topologyName, final int subtopologyId) { + return lookupBuilderForNamedTopology(topologyName).stateStoreNamesForSubtopology(subtopologyId); + } + public Map> stateStoreNameToSourceTopics() { final Map> stateStoreNameToSourceTopics = new HashMap<>(); applyToEachBuilder(b -> stateStoreNameToSourceTopics.putAll(b.stateStoreNameToFullSourceTopicNames())); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index bbb4625d56440..1a0166523710a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -71,6 +71,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -568,6 +569,29 @@ public void testAddStateStore() { assertEquals(storeBuilder.name(), suppliers.get(0).name()); } + @Test + public void testStateStoreNamesForSubtopology() { + builder.addStateStore(storeBuilder); + builder.setApplicationId("X"); + + builder.addSource(null, "source-1", null, null, null, "topic-1"); + builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); + builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); + + builder.addSource(null, "source-2", null, null, null, "topic-2"); + builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2"); + + builder.buildTopology(); + final Set stateStoreNames = builder.stateStoreNamesForSubtopology(0); + assertThat(stateStoreNames, equalTo(mkSet(storeBuilder.name()))); + + final Set emptyStoreNames = builder.stateStoreNamesForSubtopology(1); + assertThat(emptyStoreNames, equalTo(mkSet())); + + final Set stateStoreNamesUnknownSubtopology = builder.stateStoreNamesForSubtopology(13); + assertThat(stateStoreNamesUnknownSubtopology, nullValue()); + } + @Test public void shouldAllowAddingSameStoreBuilderMultipleTimes() { builder.setApplicationId("X");