From 35489bfca32a97bf19a7a5fb06e4389b3b2f3b07 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 6 Jan 2025 13:51:00 +0100 Subject: [PATCH] KAFKA-18311: Add internal datastructure for configuring topologies (1/N) (#18268) Clients in the Streams Rebalance Protocol send an "unconfigured" representation of the topology to the broker. That is, the number of input topics and (some) internal topics is not fixed, regular expressions are not resolved. The broker takes this description of the topology and, together with the current state of the topics on the broker, derives a ConfiguredTopology. The configured topology is what is being returned from StreamsGroupDescribe, and has all number of partitions defined, and regular expressions resolved. The configured topology also contains missing internal topics that need to be created, and potentially configuration errors, such as missing source topics. In this change, we add the internal data structures for representing the configured topology. They differ in some details from the data structures used in the RPCs. Most importantly, they can be evolved independently of the public interface. Reviewers: Bruno Cadonna --- .../topics/ConfiguredInternalTopic.java | 69 +++++++++++ .../streams/topics/ConfiguredSubtopology.java | 62 ++++++++++ .../streams/topics/ConfiguredTopology.java | 67 ++++++++++ .../topics/TopicConfigurationException.java | 46 +++++++ .../topics/ConfiguredInternalTopicTest.java | 76 ++++++++++++ .../topics/ConfiguredSubtopologyTest.java | 106 ++++++++++++++++ .../topics/ConfiguredTopologyTest.java | 117 ++++++++++++++++++ .../TopicConfigurationExceptionTest.java | 48 +++++++ 8 files changed, 591 insertions(+) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java new file mode 100644 index 0000000000000..855f1ea0b58d1 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Captures the properties required for configuring the internal topics we create for changelogs and repartitioning etc. + *

+ * It is derived from the topology sent by the client, and the current state of the topics inside the broker. If the topics on the broker + * changes, the internal topic may need to be reconfigured. + * + * @param name The name of the topic. + * @param numberOfPartitions The number of partitions for the topic. + * @param replicationFactor The replication factor of the topic. If undefiend, the broker default is used. + * @param topicConfigs The topic configurations of the topic. + */ +public record ConfiguredInternalTopic(String name, + int numberOfPartitions, + Optional replicationFactor, + Map topicConfigs +) { + + public ConfiguredInternalTopic { + Objects.requireNonNull(name, "name can't be null"); + Topic.validate(name); + if (numberOfPartitions < 1) { + throw new IllegalArgumentException("Number of partitions must be at least 1."); + } + topicConfigs = Collections.unmodifiableMap(Objects.requireNonNull(topicConfigs, "topicConfigs can't be null")); + } + + public StreamsGroupDescribeResponseData.TopicInfo asStreamsGroupDescribeTopicInfo() { + return new StreamsGroupDescribeResponseData.TopicInfo() + .setName(name) + .setPartitions(numberOfPartitions) + .setReplicationFactor(replicationFactor.orElse((short) 0)) + .setTopicConfigs( + topicConfigs != null ? + topicConfigs.entrySet().stream().map( + y -> new StreamsGroupDescribeResponseData.KeyValue() + .setKey(y.getKey()) + .setValue(y.getValue()) + ).collect(Collectors.toList()) : null + ); + } + +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java new file mode 100644 index 0000000000000..bfc1a86a06b9f --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Internal representation of a subtopology. + *

+ * The subtopology is configured according to the number of partitions available in the source topics. It has regular expressions already + * resolved and defined exactly the information that is being used by streams groups assignment reconciliation. + *

+ * Configured subtopologies may be recreated every time the input topics used by the subtopology are modified. + * + * @param sourceTopics The source topics of the subtopology. + * @param repartitionSourceTopics The repartition source topics of the subtopology. + * @param repartitionSinkTopics The repartition sink topics of the subtopology. + * @param stateChangelogTopics The state changelog topics of the subtopology. + */ +public record ConfiguredSubtopology(Set sourceTopics, + Map repartitionSourceTopics, + Set repartitionSinkTopics, + Map stateChangelogTopics) { + + public ConfiguredSubtopology { + Objects.requireNonNull(sourceTopics, "sourceTopics can't be null"); + Objects.requireNonNull(repartitionSourceTopics, "repartitionSourceTopics can't be null"); + Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics can't be null"); + Objects.requireNonNull(stateChangelogTopics, "stateChangelogTopics can't be null"); + } + + public StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtopology(String subtopologyId) { + return new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId(subtopologyId) + .setSourceTopics(sourceTopics.stream().sorted().collect(Collectors.toList())) + .setRepartitionSinkTopics(repartitionSinkTopics.stream().sorted().collect(Collectors.toList())) + .setRepartitionSourceTopics(repartitionSourceTopics.values().stream() + .map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList())) + .setStateChangelogTopics(stateChangelogTopics.values().stream() + .map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList())); + } + +} \ No newline at end of file diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java new file mode 100644 index 0000000000000..86f8080421c46 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This class captures the result of taking a topology definition sent by the client and using the current state of the topics inside the + * broker to configure the internal topics required for the topology. + * + * @param topologyEpoch The epoch of the topology. Same as the topology epoch in the heartbeat request that last initialized + * the topology. + * @param subtopologies Contains the subtopologies that have been configured. This can be used by the task assignors, since it + * specifies the number of tasks available for every subtopology. + * @param internalTopicsToBeCreated Contains a list of internal topics that need to be created. This is used to create the topics in the + * broker. + * @param topicConfigurationException If the topic configuration process failed, e.g. because expected topics are missing or have an + * incorrect number of partitions, this field will store the error that occurred, so that is can be + * reported back to the client. + */ +public record ConfiguredTopology(int topologyEpoch, + Map subtopologies, + Map internalTopicsToBeCreated, + Optional topicConfigurationException) { + + public ConfiguredTopology { + if (topologyEpoch < 0) { + throw new IllegalArgumentException("Topology epoch must be non-negative."); + } + Objects.requireNonNull(subtopologies, "subtopologies can't be null"); + Objects.requireNonNull(internalTopicsToBeCreated, "internalTopicsToBeCreated can't be null"); + Objects.requireNonNull(topicConfigurationException, "topicConfigurationException can't be null"); + } + + public boolean isReady() { + return topicConfigurationException.isEmpty(); + } + + public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() { + return new StreamsGroupDescribeResponseData.Topology() + .setEpoch(topologyEpoch) + .setSubtopologies(subtopologies.entrySet().stream().map( + entry -> entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey()) + ).collect(Collectors.toList())); + } + +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java new file mode 100644 index 0000000000000..f52c950b77095 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; + +public class TopicConfigurationException extends RuntimeException { + + private final Status status; + + public TopicConfigurationException(StreamsGroupHeartbeatResponse.Status status, String message) { + super(message); + this.status = status; + } + + public Status status() { + return status; + } + + public static TopicConfigurationException incorrectlyPartitionedTopics(String message) { + return new TopicConfigurationException(Status.INCORRECTLY_PARTITIONED_TOPICS, message); + } + + public static TopicConfigurationException missingSourceTopics(String message) { + return new TopicConfigurationException(Status.MISSING_SOURCE_TOPICS, message); + } + + public static TopicConfigurationException missingInternalTopics(String message) { + return new TopicConfigurationException(Status.MISSING_INTERNAL_TOPICS, message); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java new file mode 100644 index 0000000000000..e1db0f048ac93 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ConfiguredInternalTopicTest { + + @Test + public void testConstructorWithNullName() { + assertThrows(NullPointerException.class, + () -> new ConfiguredInternalTopic(null, 1, Optional.empty(), Collections.emptyMap())); + } + + @Test + public void testConstructorWithInvalidName() { + assertThrows(InvalidTopicException.class, + () -> new ConfiguredInternalTopic("invalid topic name", 1, Optional.empty(), Collections.emptyMap())); + } + + @Test + public void testConstructorWithNullTopicConfigs() { + assertThrows(NullPointerException.class, + () -> new ConfiguredInternalTopic("test-topic", 1, Optional.empty(), null)); + } + + @Test + public void testConstructorWithZeroPartitions() { + assertThrows(IllegalArgumentException.class, + () -> new ConfiguredInternalTopic("test-topic", 0, Optional.empty(), Collections.emptyMap())); + } + + @Test + public void testAsStreamsGroupDescribeTopicInfo() { + String topicName = "test-topic"; + Map topicConfigs = new HashMap<>(); + topicConfigs.put("retention.ms", "1000"); + int numberOfPartitions = 3; + Optional replicationFactor = Optional.of((short) 2); + ConfiguredInternalTopic configuredInternalTopic = new ConfiguredInternalTopic( + topicName, numberOfPartitions, replicationFactor, topicConfigs); + + StreamsGroupDescribeResponseData.TopicInfo topicInfo = configuredInternalTopic.asStreamsGroupDescribeTopicInfo(); + + assertEquals(topicName, topicInfo.name()); + assertEquals(numberOfPartitions, topicInfo.partitions()); + assertEquals(replicationFactor.orElse((short) 0).shortValue(), topicInfo.replicationFactor()); + assertEquals(1, topicInfo.topicConfigs().size()); + assertEquals("1000", topicInfo.topicConfigs().get(0).value()); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java new file mode 100644 index 0000000000000..d30716c25f7d8 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConfiguredSubtopologyTest { + + @Test + public void testConstructorWithNullSourceTopics() { + assertThrows(NullPointerException.class, + () -> new ConfiguredSubtopology( + null, + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptyMap() + ) + ); + } + + @Test + public void testConstructorWithNullRepartitionSourceTopics() { + assertThrows(NullPointerException.class, + () -> new ConfiguredSubtopology( + Collections.emptySet(), + null, + Collections.emptySet(), + Collections.emptyMap() + ) + ); + } + + @Test + public void testConstructorWithNullRepartitionSinkTopics() { + assertThrows(NullPointerException.class, + () -> new ConfiguredSubtopology( + Collections.emptySet(), + Collections.emptyMap(), + null, + Collections.emptyMap() + ) + ); + } + + @Test + public void testConstructorWithNullStateChangelogTopics() { + assertThrows(NullPointerException.class, + () -> new ConfiguredSubtopology( + Collections.emptySet(), + Collections.emptyMap(), + Collections.emptySet(), + null + ) + ); + } + + @Test + public void testAsStreamsGroupDescribeSubtopology() { + String subtopologyId = "subtopology1"; + Set sourceTopics = new HashSet<>(Set.of("sourceTopic1", "sourceTopic2")); + Set repartitionSinkTopics = new HashSet<>(Set.of("repartitionSinkTopic1", "repartitionSinkTopic2")); + ConfiguredInternalTopic internalTopicMock = mock(ConfiguredInternalTopic.class); + StreamsGroupDescribeResponseData.TopicInfo topicInfo = new StreamsGroupDescribeResponseData.TopicInfo(); + when(internalTopicMock.asStreamsGroupDescribeTopicInfo()).thenReturn(topicInfo); + Map repartitionSourceTopics = Map.of("repartitionSourceTopic1", internalTopicMock); + Map stateChangelogTopics = Map.of("stateChangelogTopic1", internalTopicMock); + ConfiguredSubtopology configuredSubtopology = new ConfiguredSubtopology( + sourceTopics, repartitionSourceTopics, repartitionSinkTopics, stateChangelogTopics); + + StreamsGroupDescribeResponseData.Subtopology subtopology = configuredSubtopology.asStreamsGroupDescribeSubtopology(subtopologyId); + + assertEquals(subtopologyId, subtopology.subtopologyId()); + assertEquals(sourceTopics.stream().sorted().toList(), subtopology.sourceTopics()); + assertEquals(repartitionSinkTopics.stream().sorted().toList(), subtopology.repartitionSinkTopics()); + assertEquals(List.of(topicInfo), subtopology.repartitionSourceTopics()); + assertEquals(List.of(topicInfo), subtopology.stateChangelogTopics()); + } + +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java new file mode 100644 index 0000000000000..fc862a7a02745 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConfiguredTopologyTest { + + @Test + public void testConstructorWithNullSubtopologies() { + assertThrows(NullPointerException.class, + () -> new ConfiguredTopology( + 0, + null, + Collections.emptyMap(), + Optional.empty() + ) + ); + } + + @Test + public void testConstructorWithNullInternalTopicsToBeCreated() { + assertThrows(NullPointerException.class, + () -> new ConfiguredTopology( + 0, + Collections.emptyMap(), + null, + Optional.empty() + ) + ); + } + + @Test + public void testConstructorWithNullTopicConfigurationException() { + assertThrows(NullPointerException.class, + () -> new ConfiguredTopology( + 0, + Collections.emptyMap(), + Collections.emptyMap(), + null + ) + ); + } + + @Test + public void testConstructorWithInvalidTopologyEpoch() { + assertThrows(IllegalArgumentException.class, + () -> new ConfiguredTopology( + -1, + Collections.emptyMap(), + Collections.emptyMap(), + Optional.empty() + ) + ); + } + + @Test + public void testIsReady() { + ConfiguredTopology readyTopology = new ConfiguredTopology( + 1, new HashMap<>(), new HashMap<>(), Optional.empty()); + assertTrue(readyTopology.isReady()); + + ConfiguredTopology notReadyTopology = new ConfiguredTopology( + 1, new HashMap<>(), new HashMap<>(), Optional.of(TopicConfigurationException.missingSourceTopics("missing"))); + assertFalse(notReadyTopology.isReady()); + } + + @Test + public void testAsStreamsGroupDescribeTopology() { + int topologyEpoch = 1; + ConfiguredSubtopology subtopologyMock = mock(ConfiguredSubtopology.class); + StreamsGroupDescribeResponseData.Subtopology subtopologyResponse = new StreamsGroupDescribeResponseData.Subtopology(); + when(subtopologyMock.asStreamsGroupDescribeSubtopology(Mockito.anyString())).thenReturn(subtopologyResponse); + Map subtopologies = new HashMap<>(); + subtopologies.put("subtopology1", subtopologyMock); + Map internalTopicsToBeCreated = new HashMap<>(); + Optional topicConfigurationException = Optional.empty(); + ConfiguredTopology configuredTopology = new ConfiguredTopology( + topologyEpoch, subtopologies, internalTopicsToBeCreated, topicConfigurationException); + + StreamsGroupDescribeResponseData.Topology topology = configuredTopology.asStreamsGroupDescribeTopology(); + + assertEquals(topologyEpoch, topology.epoch()); + assertEquals(1, topology.subtopologies().size()); + assertEquals(subtopologyResponse, topology.subtopologies().get(0)); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java new file mode 100644 index 0000000000000..479cef5db13aa --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TopicConfigurationExceptionTest { + + @Test + public void testMissingSourceTopics() { + TopicConfigurationException exception = TopicConfigurationException.missingSourceTopics("test"); + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals("test", exception.getMessage()); + } + + @Test + public void testMissingInternalTopics() { + TopicConfigurationException exception = TopicConfigurationException.missingInternalTopics("test"); + assertEquals(Status.MISSING_INTERNAL_TOPICS, exception.status()); + assertEquals("test", exception.getMessage()); + } + + @Test + public void testIncorrectlyPartitionedTopics() { + TopicConfigurationException exception = TopicConfigurationException.incorrectlyPartitionedTopics("test"); + assertEquals(Status.INCORRECTLY_PARTITIONED_TOPICS, exception.status()); + assertEquals("test", exception.getMessage()); + } + +}