diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
new file mode 100644
index 0000000000000..855f1ea0b58d1
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Captures the properties required for configuring the internal topics we create for changelogs and repartitioning etc.
+ *
+ * It is derived from the topology sent by the client, and the current state of the topics inside the broker. If the topics on the broker
+ * changes, the internal topic may need to be reconfigured.
+ *
+ * @param name The name of the topic.
+ * @param numberOfPartitions The number of partitions for the topic.
+ * @param replicationFactor The replication factor of the topic. If undefiend, the broker default is used.
+ * @param topicConfigs The topic configurations of the topic.
+ */
+public record ConfiguredInternalTopic(String name,
+ int numberOfPartitions,
+ Optional replicationFactor,
+ Map topicConfigs
+) {
+
+ public ConfiguredInternalTopic {
+ Objects.requireNonNull(name, "name can't be null");
+ Topic.validate(name);
+ if (numberOfPartitions < 1) {
+ throw new IllegalArgumentException("Number of partitions must be at least 1.");
+ }
+ topicConfigs = Collections.unmodifiableMap(Objects.requireNonNull(topicConfigs, "topicConfigs can't be null"));
+ }
+
+ public StreamsGroupDescribeResponseData.TopicInfo asStreamsGroupDescribeTopicInfo() {
+ return new StreamsGroupDescribeResponseData.TopicInfo()
+ .setName(name)
+ .setPartitions(numberOfPartitions)
+ .setReplicationFactor(replicationFactor.orElse((short) 0))
+ .setTopicConfigs(
+ topicConfigs != null ?
+ topicConfigs.entrySet().stream().map(
+ y -> new StreamsGroupDescribeResponseData.KeyValue()
+ .setKey(y.getKey())
+ .setValue(y.getValue())
+ ).collect(Collectors.toList()) : null
+ );
+ }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
new file mode 100644
index 0000000000000..bfc1a86a06b9f
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Internal representation of a subtopology.
+ *
+ * The subtopology is configured according to the number of partitions available in the source topics. It has regular expressions already
+ * resolved and defined exactly the information that is being used by streams groups assignment reconciliation.
+ *
+ * Configured subtopologies may be recreated every time the input topics used by the subtopology are modified.
+ *
+ * @param sourceTopics The source topics of the subtopology.
+ * @param repartitionSourceTopics The repartition source topics of the subtopology.
+ * @param repartitionSinkTopics The repartition sink topics of the subtopology.
+ * @param stateChangelogTopics The state changelog topics of the subtopology.
+ */
+public record ConfiguredSubtopology(Set sourceTopics,
+ Map repartitionSourceTopics,
+ Set repartitionSinkTopics,
+ Map stateChangelogTopics) {
+
+ public ConfiguredSubtopology {
+ Objects.requireNonNull(sourceTopics, "sourceTopics can't be null");
+ Objects.requireNonNull(repartitionSourceTopics, "repartitionSourceTopics can't be null");
+ Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics can't be null");
+ Objects.requireNonNull(stateChangelogTopics, "stateChangelogTopics can't be null");
+ }
+
+ public StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtopology(String subtopologyId) {
+ return new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId(subtopologyId)
+ .setSourceTopics(sourceTopics.stream().sorted().collect(Collectors.toList()))
+ .setRepartitionSinkTopics(repartitionSinkTopics.stream().sorted().collect(Collectors.toList()))
+ .setRepartitionSourceTopics(repartitionSourceTopics.values().stream()
+ .map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList()))
+ .setStateChangelogTopics(stateChangelogTopics.values().stream()
+ .map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList()));
+ }
+
+}
\ No newline at end of file
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
new file mode 100644
index 0000000000000..86f8080421c46
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This class captures the result of taking a topology definition sent by the client and using the current state of the topics inside the
+ * broker to configure the internal topics required for the topology.
+ *
+ * @param topologyEpoch The epoch of the topology. Same as the topology epoch in the heartbeat request that last initialized
+ * the topology.
+ * @param subtopologies Contains the subtopologies that have been configured. This can be used by the task assignors, since it
+ * specifies the number of tasks available for every subtopology.
+ * @param internalTopicsToBeCreated Contains a list of internal topics that need to be created. This is used to create the topics in the
+ * broker.
+ * @param topicConfigurationException If the topic configuration process failed, e.g. because expected topics are missing or have an
+ * incorrect number of partitions, this field will store the error that occurred, so that is can be
+ * reported back to the client.
+ */
+public record ConfiguredTopology(int topologyEpoch,
+ Map subtopologies,
+ Map internalTopicsToBeCreated,
+ Optional topicConfigurationException) {
+
+ public ConfiguredTopology {
+ if (topologyEpoch < 0) {
+ throw new IllegalArgumentException("Topology epoch must be non-negative.");
+ }
+ Objects.requireNonNull(subtopologies, "subtopologies can't be null");
+ Objects.requireNonNull(internalTopicsToBeCreated, "internalTopicsToBeCreated can't be null");
+ Objects.requireNonNull(topicConfigurationException, "topicConfigurationException can't be null");
+ }
+
+ public boolean isReady() {
+ return topicConfigurationException.isEmpty();
+ }
+
+ public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() {
+ return new StreamsGroupDescribeResponseData.Topology()
+ .setEpoch(topologyEpoch)
+ .setSubtopologies(subtopologies.entrySet().stream().map(
+ entry -> entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey())
+ ).collect(Collectors.toList()));
+ }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java
new file mode 100644
index 0000000000000..f52c950b77095
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+
+public class TopicConfigurationException extends RuntimeException {
+
+ private final Status status;
+
+ public TopicConfigurationException(StreamsGroupHeartbeatResponse.Status status, String message) {
+ super(message);
+ this.status = status;
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ public static TopicConfigurationException incorrectlyPartitionedTopics(String message) {
+ return new TopicConfigurationException(Status.INCORRECTLY_PARTITIONED_TOPICS, message);
+ }
+
+ public static TopicConfigurationException missingSourceTopics(String message) {
+ return new TopicConfigurationException(Status.MISSING_SOURCE_TOPICS, message);
+ }
+
+ public static TopicConfigurationException missingInternalTopics(String message) {
+ return new TopicConfigurationException(Status.MISSING_INTERNAL_TOPICS, message);
+ }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
new file mode 100644
index 0000000000000..e1db0f048ac93
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ConfiguredInternalTopicTest {
+
+ @Test
+ public void testConstructorWithNullName() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredInternalTopic(null, 1, Optional.empty(), Collections.emptyMap()));
+ }
+
+ @Test
+ public void testConstructorWithInvalidName() {
+ assertThrows(InvalidTopicException.class,
+ () -> new ConfiguredInternalTopic("invalid topic name", 1, Optional.empty(), Collections.emptyMap()));
+ }
+
+ @Test
+ public void testConstructorWithNullTopicConfigs() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredInternalTopic("test-topic", 1, Optional.empty(), null));
+ }
+
+ @Test
+ public void testConstructorWithZeroPartitions() {
+ assertThrows(IllegalArgumentException.class,
+ () -> new ConfiguredInternalTopic("test-topic", 0, Optional.empty(), Collections.emptyMap()));
+ }
+
+ @Test
+ public void testAsStreamsGroupDescribeTopicInfo() {
+ String topicName = "test-topic";
+ Map topicConfigs = new HashMap<>();
+ topicConfigs.put("retention.ms", "1000");
+ int numberOfPartitions = 3;
+ Optional replicationFactor = Optional.of((short) 2);
+ ConfiguredInternalTopic configuredInternalTopic = new ConfiguredInternalTopic(
+ topicName, numberOfPartitions, replicationFactor, topicConfigs);
+
+ StreamsGroupDescribeResponseData.TopicInfo topicInfo = configuredInternalTopic.asStreamsGroupDescribeTopicInfo();
+
+ assertEquals(topicName, topicInfo.name());
+ assertEquals(numberOfPartitions, topicInfo.partitions());
+ assertEquals(replicationFactor.orElse((short) 0).shortValue(), topicInfo.replicationFactor());
+ assertEquals(1, topicInfo.topicConfigs().size());
+ assertEquals("1000", topicInfo.topicConfigs().get(0).value());
+ }
+}
\ No newline at end of file
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
new file mode 100644
index 0000000000000..d30716c25f7d8
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfiguredSubtopologyTest {
+
+ @Test
+ public void testConstructorWithNullSourceTopics() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredSubtopology(
+ null,
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptyMap()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullRepartitionSourceTopics() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredSubtopology(
+ Collections.emptySet(),
+ null,
+ Collections.emptySet(),
+ Collections.emptyMap()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullRepartitionSinkTopics() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredSubtopology(
+ Collections.emptySet(),
+ Collections.emptyMap(),
+ null,
+ Collections.emptyMap()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullStateChangelogTopics() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredSubtopology(
+ Collections.emptySet(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ null
+ )
+ );
+ }
+
+ @Test
+ public void testAsStreamsGroupDescribeSubtopology() {
+ String subtopologyId = "subtopology1";
+ Set sourceTopics = new HashSet<>(Set.of("sourceTopic1", "sourceTopic2"));
+ Set repartitionSinkTopics = new HashSet<>(Set.of("repartitionSinkTopic1", "repartitionSinkTopic2"));
+ ConfiguredInternalTopic internalTopicMock = mock(ConfiguredInternalTopic.class);
+ StreamsGroupDescribeResponseData.TopicInfo topicInfo = new StreamsGroupDescribeResponseData.TopicInfo();
+ when(internalTopicMock.asStreamsGroupDescribeTopicInfo()).thenReturn(topicInfo);
+ Map repartitionSourceTopics = Map.of("repartitionSourceTopic1", internalTopicMock);
+ Map stateChangelogTopics = Map.of("stateChangelogTopic1", internalTopicMock);
+ ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology(
+ sourceTopics, repartitionSourceTopics, repartitionSinkTopics, stateChangelogTopics);
+
+ StreamsGroupDescribeResponseData.Subtopology subtopology = configuredSubtopology.asStreamsGroupDescribeSubtopology(subtopologyId);
+
+ assertEquals(subtopologyId, subtopology.subtopologyId());
+ assertEquals(sourceTopics.stream().sorted().toList(), subtopology.sourceTopics());
+ assertEquals(repartitionSinkTopics.stream().sorted().toList(), subtopology.repartitionSinkTopics());
+ assertEquals(List.of(topicInfo), subtopology.repartitionSourceTopics());
+ assertEquals(List.of(topicInfo), subtopology.stateChangelogTopics());
+ }
+
+}
\ No newline at end of file
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
new file mode 100644
index 0000000000000..fc862a7a02745
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfiguredTopologyTest {
+
+ @Test
+ public void testConstructorWithNullSubtopologies() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredTopology(
+ 0,
+ null,
+ Collections.emptyMap(),
+ Optional.empty()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullInternalTopicsToBeCreated() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredTopology(
+ 0,
+ Collections.emptyMap(),
+ null,
+ Optional.empty()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullTopicConfigurationException() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredTopology(
+ 0,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ null
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithInvalidTopologyEpoch() {
+ assertThrows(IllegalArgumentException.class,
+ () -> new ConfiguredTopology(
+ -1,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Optional.empty()
+ )
+ );
+ }
+
+ @Test
+ public void testIsReady() {
+ ConfiguredTopology readyTopology = new ConfiguredTopology(
+ 1, new HashMap<>(), new HashMap<>(), Optional.empty());
+ assertTrue(readyTopology.isReady());
+
+ ConfiguredTopology notReadyTopology = new ConfiguredTopology(
+ 1, new HashMap<>(), new HashMap<>(), Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
+ assertFalse(notReadyTopology.isReady());
+ }
+
+ @Test
+ public void testAsStreamsGroupDescribeTopology() {
+ int topologyEpoch = 1;
+ ConfiguredSubtopology subtopologyMock = mock(ConfiguredSubtopology.class);
+ StreamsGroupDescribeResponseData.Subtopology subtopologyResponse = new StreamsGroupDescribeResponseData.Subtopology();
+ when(subtopologyMock.asStreamsGroupDescribeSubtopology(Mockito.anyString())).thenReturn(subtopologyResponse);
+ Map subtopologies = new HashMap<>();
+ subtopologies.put("subtopology1", subtopologyMock);
+ Map internalTopicsToBeCreated = new HashMap<>();
+ Optional topicConfigurationException = Optional.empty();
+ ConfiguredTopology configuredTopology = new ConfiguredTopology(
+ topologyEpoch, subtopologies, internalTopicsToBeCreated, topicConfigurationException);
+
+ StreamsGroupDescribeResponseData.Topology topology = configuredTopology.asStreamsGroupDescribeTopology();
+
+ assertEquals(topologyEpoch, topology.epoch());
+ assertEquals(1, topology.subtopologies().size());
+ assertEquals(subtopologyResponse, topology.subtopologies().get(0));
+ }
+}
\ No newline at end of file
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java
new file mode 100644
index 0000000000000..479cef5db13aa
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TopicConfigurationExceptionTest {
+
+ @Test
+ public void testMissingSourceTopics() {
+ TopicConfigurationException exception = TopicConfigurationException.missingSourceTopics("test");
+ assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+ assertEquals("test", exception.getMessage());
+ }
+
+ @Test
+ public void testMissingInternalTopics() {
+ TopicConfigurationException exception = TopicConfigurationException.missingInternalTopics("test");
+ assertEquals(Status.MISSING_INTERNAL_TOPICS, exception.status());
+ assertEquals("test", exception.getMessage());
+ }
+
+ @Test
+ public void testIncorrectlyPartitionedTopics() {
+ TopicConfigurationException exception = TopicConfigurationException.incorrectlyPartitionedTopics("test");
+ assertEquals(Status.INCORRECTLY_PARTITIONED_TOPICS, exception.status());
+ assertEquals("test", exception.getMessage());
+ }
+
+}