diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java new file mode 100644 index 0000000000000..a7792471e4e84 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * This class is responsible for setting up the changelog topics for a topology. For a given topology, which does not have the number + * of partitions specified for changelog partitions, this class will determine the number of partitions for each non-source changelog topic. + */ +public class ChangelogTopics { + + private final Logger log; + private final Collection subtopologies; + private final Function topicPartitionCountProvider; + + /** + * Constructor for ChangelogTopics. + * + * @param logContext The context for emitting log messages. + * @param subtopologies The subtopologies for the requested topology. + * @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the broker + * as well as any partition number decisions that have already been made. In particular, we expect + * the number of partitions for all repartition topics defined, even if they do not exist in the + * broker yet. + */ + public ChangelogTopics( + final LogContext logContext, + final Collection subtopologies, + final Function topicPartitionCountProvider + ) { + this.log = logContext.logger(getClass()); + this.subtopologies = subtopologies; + this.topicPartitionCountProvider = topicPartitionCountProvider; + } + + /** + * Determines the number of partitions for each non-source changelog topic in the requested topology. + * + * @return the map of all non-source changelog topics for the requested topology to their required number of partitions. + */ + public Map setup() { + final Map changelogTopicPartitions = new HashMap<>(); + for (Subtopology subtopology : subtopologies) { + final Set sourceTopics = new HashSet<>(subtopology.sourceTopics()); + + final OptionalInt maxNumPartitions = + subtopology.sourceTopics().stream().mapToInt(this::getPartitionCountOrFail).max(); + + if (maxNumPartitions.isEmpty()) { + throw new StreamsInvalidTopologyException("No source topics found for subtopology " + subtopology.subtopologyId()); + } + for (final TopicInfo topicInfo : subtopology.stateChangelogTopics()) { + if (!sourceTopics.contains(topicInfo.name())) { + changelogTopicPartitions.put(topicInfo.name(), maxNumPartitions.getAsInt()); + } + } + } + + log.debug("Expecting state changelog topic partitions {} for the requested topology.", + changelogTopicPartitions.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", "))); + + return changelogTopicPartitions; + } + + private int getPartitionCountOrFail(String topic) { + final OptionalInt topicPartitionCount = topicPartitionCountProvider.apply(topic); + if (topicPartitionCount.isEmpty()) { + throw TopicConfigurationException.missingSourceTopics("No partition count for source topic " + topic); + } + return topicPartitionCount.getAsInt(); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java new file mode 100644 index 0000000000000..ab7aec1dce2fe --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ChangelogTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME = "source"; + private static final String SINK_TOPIC_NAME = "sink"; + private static final String REPARTITION_TOPIC_NAME = "repartition"; + private static final String CHANGELOG_TOPIC_NAME1 = "changelog1"; + private static final TopicConfig TOPIC_CONFIG = new TopicConfig().setKey("config1").setValue("val1"); + private static final TopicInfo REPARTITION_TOPIC_INFO = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME) + .setTopicConfigs(List.of(TOPIC_CONFIG)); + private static final Subtopology SUBTOPOLOGY_NO_SOURCE = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_NO_SOURCE") + .setSourceTopics(Collections.emptyList()) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO)) + .setStateChangelogTopics(Collections.emptyList()); + private static final Subtopology SUBTOPOLOGY_STATELESS = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_STATELESS") + .setSourceTopics(Collections.singletonList(SOURCE_TOPIC_NAME)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO)) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo SOURCE_CHANGELOG_TOPIC_CONFIG = new TopicInfo() + .setName(SOURCE_TOPIC_NAME) + .setTopicConfigs(List.of(TOPIC_CONFIG)); + private static final Subtopology SUBTOPOLOGY_SOURCE_CHANGELOG = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_SOURCE_CHANGELOG") + .setSourceTopics(Collections.singletonList(SOURCE_TOPIC_NAME)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO)) + .setStateChangelogTopics(List.of(SOURCE_CHANGELOG_TOPIC_CONFIG)); + private static final TopicInfo CHANGELOG_TOPIC_CONFIG = new TopicInfo() + .setName(CHANGELOG_TOPIC_NAME1) + .setTopicConfigs(List.of(TOPIC_CONFIG)); + private static final Subtopology SUBTOPOLOGY_STATEFUL = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_STATEFUL") + .setSourceTopics(Collections.singletonList(SOURCE_TOPIC_NAME)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO)) + .setStateChangelogTopics(List.of(CHANGELOG_TOPIC_CONFIG)); + private static final Subtopology SUBTOPOLOGY_BOTH = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_BOTH") + .setSourceTopics(Collections.singletonList(SOURCE_TOPIC_NAME)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO)) + .setStateChangelogTopics(List.of(SOURCE_CHANGELOG_TOPIC_CONFIG, CHANGELOG_TOPIC_CONFIG)); + + private static OptionalInt topicPartitionProvider(String s) { + return OptionalInt.of(3); + } + + @Test + public void shouldFailIfNoSourceTopics() { + final List subtopologies = List.of(SUBTOPOLOGY_NO_SOURCE); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + StreamsInvalidTopologyException e = assertThrows(StreamsInvalidTopologyException.class, changelogTopics::setup); + + assertTrue(e.getMessage().contains("No source topics found for subtopology")); + } + + @Test + public void shouldNotContainChangelogsForStatelessTasks() { + final List subtopologies = List.of(SUBTOPOLOGY_STATELESS); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + Map setup = changelogTopics.setup(); + + assertEquals(Collections.emptyMap(), setup); + } + + @Test + public void shouldContainNonSourceBasedChangelogs() { + final List subtopologies = List.of(SUBTOPOLOGY_STATEFUL); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + Map setup = changelogTopics.setup(); + + assertEquals(Map.of(CHANGELOG_TOPIC_CONFIG.name(), 3), setup); + } + + @Test + public void shouldNotContainSourceBasedChangelogs() { + final List subtopologies = List.of(SUBTOPOLOGY_SOURCE_CHANGELOG); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + Map setup = changelogTopics.setup(); + + assertEquals(Collections.emptyMap(), setup); + } + + @Test + public void shouldContainBothTypesOfPreExistingChangelogs() { + final List subtopologies = List.of(SUBTOPOLOGY_BOTH); + + final ChangelogTopics changelogTopics = + new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider); + Map setup = changelogTopics.setup(); + + assertEquals(Map.of(CHANGELOG_TOPIC_CONFIG.name(), 3), setup); + } +}