Skip to content

Commit

Permalink
KAFKA-18311: Configuring changelog topics (2/N) (apache#18379)
Browse files Browse the repository at this point in the history
A simplified port of "ChangelogTopics" from the client-side to the group coordinator

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Reviewers: Bruno Cadonna <[email protected]>
  • Loading branch information
lucasbru authored Jan 6, 2025
1 parent e546b02 commit 2521aee
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;

import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* This class is responsible for setting up the changelog topics for a topology. For a given topology, which does not have the number
* of partitions specified for changelog partitions, this class will determine the number of partitions for each non-source changelog topic.
*/
public class ChangelogTopics {

private final Logger log;
private final Collection<Subtopology> subtopologies;
private final Function<String, OptionalInt> topicPartitionCountProvider;

/**
* Constructor for ChangelogTopics.
*
* @param logContext The context for emitting log messages.
* @param subtopologies The subtopologies for the requested topology.
* @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the broker
* as well as any partition number decisions that have already been made. In particular, we expect
* the number of partitions for all repartition topics defined, even if they do not exist in the
* broker yet.
*/
public ChangelogTopics(
final LogContext logContext,
final Collection<Subtopology> subtopologies,
final Function<String, OptionalInt> topicPartitionCountProvider
) {
this.log = logContext.logger(getClass());
this.subtopologies = subtopologies;
this.topicPartitionCountProvider = topicPartitionCountProvider;
}

/**
* Determines the number of partitions for each non-source changelog topic in the requested topology.
*
* @return the map of all non-source changelog topics for the requested topology to their required number of partitions.
*/
public Map<String, Integer> setup() {
final Map<String, Integer> changelogTopicPartitions = new HashMap<>();
for (Subtopology subtopology : subtopologies) {
final Set<String> sourceTopics = new HashSet<>(subtopology.sourceTopics());

final OptionalInt maxNumPartitions =
subtopology.sourceTopics().stream().mapToInt(this::getPartitionCountOrFail).max();

if (maxNumPartitions.isEmpty()) {
throw new StreamsInvalidTopologyException("No source topics found for subtopology " + subtopology.subtopologyId());
}
for (final TopicInfo topicInfo : subtopology.stateChangelogTopics()) {
if (!sourceTopics.contains(topicInfo.name())) {
changelogTopicPartitions.put(topicInfo.name(), maxNumPartitions.getAsInt());
}
}
}

log.debug("Expecting state changelog topic partitions {} for the requested topology.",
changelogTopicPartitions.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")));

return changelogTopicPartitions;
}

private int getPartitionCountOrFail(String topic) {
final OptionalInt topicPartitionCount = topicPartitionCountProvider.apply(topic);
if (topicPartitionCount.isEmpty()) {
throw TopicConfigurationException.missingSourceTopics("No partition count for source topic " + topic);
}
return topicPartitionCount.getAsInt();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ChangelogTopicsTest {

private static final LogContext LOG_CONTEXT = new LogContext();
private static final String SOURCE_TOPIC_NAME = "source";
private static final String SINK_TOPIC_NAME = "sink";
private static final String REPARTITION_TOPIC_NAME = "repartition";
private static final String CHANGELOG_TOPIC_NAME1 = "changelog1";
private static final TopicConfig TOPIC_CONFIG = new TopicConfig().setKey("config1").setValue("val1");
private static final TopicInfo REPARTITION_TOPIC_INFO = new TopicInfo()
.setName(REPARTITION_TOPIC_NAME)
.setTopicConfigs(List.of(TOPIC_CONFIG));
private static final Subtopology SUBTOPOLOGY_NO_SOURCE = new Subtopology()
.setSubtopologyId("SUBTOPOLOGY_NO_SOURCE")
.setSourceTopics(Collections.emptyList())
.setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME))
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO))
.setStateChangelogTopics(Collections.emptyList());
private static final Subtopology SUBTOPOLOGY_STATELESS = new Subtopology()
.setSubtopologyId("SUBTOPOLOGY_STATELESS")
.setSourceTopics(Collections.singletonList(SOURCE_TOPIC_NAME))
.setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME))
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO))
.setStateChangelogTopics(Collections.emptyList());
private static final TopicInfo SOURCE_CHANGELOG_TOPIC_CONFIG = new TopicInfo()
.setName(SOURCE_TOPIC_NAME)
.setTopicConfigs(List.of(TOPIC_CONFIG));
private static final Subtopology SUBTOPOLOGY_SOURCE_CHANGELOG = new Subtopology()
.setSubtopologyId("SUBTOPOLOGY_SOURCE_CHANGELOG")
.setSourceTopics(Collections.singletonList(SOURCE_TOPIC_NAME))
.setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME))
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO))
.setStateChangelogTopics(List.of(SOURCE_CHANGELOG_TOPIC_CONFIG));
private static final TopicInfo CHANGELOG_TOPIC_CONFIG = new TopicInfo()
.setName(CHANGELOG_TOPIC_NAME1)
.setTopicConfigs(List.of(TOPIC_CONFIG));
private static final Subtopology SUBTOPOLOGY_STATEFUL = new Subtopology()
.setSubtopologyId("SUBTOPOLOGY_STATEFUL")
.setSourceTopics(Collections.singletonList(SOURCE_TOPIC_NAME))
.setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME))
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO))
.setStateChangelogTopics(List.of(CHANGELOG_TOPIC_CONFIG));
private static final Subtopology SUBTOPOLOGY_BOTH = new Subtopology()
.setSubtopologyId("SUBTOPOLOGY_BOTH")
.setSourceTopics(Collections.singletonList(SOURCE_TOPIC_NAME))
.setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME))
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO))
.setStateChangelogTopics(List.of(SOURCE_CHANGELOG_TOPIC_CONFIG, CHANGELOG_TOPIC_CONFIG));

private static OptionalInt topicPartitionProvider(String s) {
return OptionalInt.of(3);
}

@Test
public void shouldFailIfNoSourceTopics() {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_NO_SOURCE);

final ChangelogTopics changelogTopics =
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
StreamsInvalidTopologyException e = assertThrows(StreamsInvalidTopologyException.class, changelogTopics::setup);

assertTrue(e.getMessage().contains("No source topics found for subtopology"));
}

@Test
public void shouldNotContainChangelogsForStatelessTasks() {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_STATELESS);

final ChangelogTopics changelogTopics =
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
Map<String, Integer> setup = changelogTopics.setup();

assertEquals(Collections.emptyMap(), setup);
}

@Test
public void shouldContainNonSourceBasedChangelogs() {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_STATEFUL);

final ChangelogTopics changelogTopics =
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
Map<String, Integer> setup = changelogTopics.setup();

assertEquals(Map.of(CHANGELOG_TOPIC_CONFIG.name(), 3), setup);
}

@Test
public void shouldNotContainSourceBasedChangelogs() {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_SOURCE_CHANGELOG);

final ChangelogTopics changelogTopics =
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
Map<String, Integer> setup = changelogTopics.setup();

assertEquals(Collections.emptyMap(), setup);
}

@Test
public void shouldContainBothTypesOfPreExistingChangelogs() {
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_BOTH);

final ChangelogTopics changelogTopics =
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
Map<String, Integer> setup = changelogTopics.setup();

assertEquals(Map.of(CHANGELOG_TOPIC_CONFIG.name(), 3), setup);
}
}

0 comments on commit 2521aee

Please sign in to comment.