Skip to content

Commit

Permalink
KAFKA-18311: Add internal datastructure for configuring topologies (1…
Browse files Browse the repository at this point in the history
…/N) (#18268)

Clients in the Streams Rebalance Protocol send an "unconfigured" representation of the topology to the broker. That is, the number of input topics and (some) internal topics is not fixed, regular expressions are not resolved. The broker takes this description of the topology and, together with the current state of the topics on the broker, derives a ConfiguredTopology. The configured topology is what is being returned from StreamsGroupDescribe, and has all number of partitions defined, and regular expressions resolved. The configured topology also contains missing internal topics that need to be created, and potentially configuration errors, such as missing source topics.

In this change, we add the internal data structures for representing the configured topology. They differ in some details from the data structures used in the RPCs. Most importantly, they can be evolved independently of the public interface.

Reviewers: Bruno Cadonna <[email protected]>
  • Loading branch information
lucasbru authored Jan 6, 2025
1 parent 2e4a378 commit 35489bf
Show file tree
Hide file tree
Showing 8 changed files with 591 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Captures the properties required for configuring the internal topics we create for changelogs and repartitioning etc.
* <p>
* It is derived from the topology sent by the client, and the current state of the topics inside the broker. If the topics on the broker
* changes, the internal topic may need to be reconfigured.
*
* @param name The name of the topic.
* @param numberOfPartitions The number of partitions for the topic.
* @param replicationFactor The replication factor of the topic. If undefiend, the broker default is used.
* @param topicConfigs The topic configurations of the topic.
*/
public record ConfiguredInternalTopic(String name,
int numberOfPartitions,
Optional<Short> replicationFactor,
Map<String, String> topicConfigs
) {

public ConfiguredInternalTopic {
Objects.requireNonNull(name, "name can't be null");
Topic.validate(name);
if (numberOfPartitions < 1) {
throw new IllegalArgumentException("Number of partitions must be at least 1.");
}
topicConfigs = Collections.unmodifiableMap(Objects.requireNonNull(topicConfigs, "topicConfigs can't be null"));
}

public StreamsGroupDescribeResponseData.TopicInfo asStreamsGroupDescribeTopicInfo() {
return new StreamsGroupDescribeResponseData.TopicInfo()
.setName(name)
.setPartitions(numberOfPartitions)
.setReplicationFactor(replicationFactor.orElse((short) 0))
.setTopicConfigs(
topicConfigs != null ?
topicConfigs.entrySet().stream().map(
y -> new StreamsGroupDescribeResponseData.KeyValue()
.setKey(y.getKey())
.setValue(y.getValue())
).collect(Collectors.toList()) : null
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Internal representation of a subtopology.
* <p>
* The subtopology is configured according to the number of partitions available in the source topics. It has regular expressions already
* resolved and defined exactly the information that is being used by streams groups assignment reconciliation.
* <p>
* Configured subtopologies may be recreated every time the input topics used by the subtopology are modified.
*
* @param sourceTopics The source topics of the subtopology.
* @param repartitionSourceTopics The repartition source topics of the subtopology.
* @param repartitionSinkTopics The repartition sink topics of the subtopology.
* @param stateChangelogTopics The state changelog topics of the subtopology.
*/
public record ConfiguredSubtopology(Set<String> sourceTopics,
Map<String, ConfiguredInternalTopic> repartitionSourceTopics,
Set<String> repartitionSinkTopics,
Map<String, ConfiguredInternalTopic> stateChangelogTopics) {

public ConfiguredSubtopology {
Objects.requireNonNull(sourceTopics, "sourceTopics can't be null");
Objects.requireNonNull(repartitionSourceTopics, "repartitionSourceTopics can't be null");
Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics can't be null");
Objects.requireNonNull(stateChangelogTopics, "stateChangelogTopics can't be null");
}

public StreamsGroupDescribeResponseData.Subtopology asStreamsGroupDescribeSubtopology(String subtopologyId) {
return new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId(subtopologyId)
.setSourceTopics(sourceTopics.stream().sorted().collect(Collectors.toList()))
.setRepartitionSinkTopics(repartitionSinkTopics.stream().sorted().collect(Collectors.toList()))
.setRepartitionSourceTopics(repartitionSourceTopics.values().stream()
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList()))
.setStateChangelogTopics(stateChangelogTopics.values().stream()
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* This class captures the result of taking a topology definition sent by the client and using the current state of the topics inside the
* broker to configure the internal topics required for the topology.
*
* @param topologyEpoch The epoch of the topology. Same as the topology epoch in the heartbeat request that last initialized
* the topology.
* @param subtopologies Contains the subtopologies that have been configured. This can be used by the task assignors, since it
* specifies the number of tasks available for every subtopology.
* @param internalTopicsToBeCreated Contains a list of internal topics that need to be created. This is used to create the topics in the
* broker.
* @param topicConfigurationException If the topic configuration process failed, e.g. because expected topics are missing or have an
* incorrect number of partitions, this field will store the error that occurred, so that is can be
* reported back to the client.
*/
public record ConfiguredTopology(int topologyEpoch,
Map<String, ConfiguredSubtopology> subtopologies,
Map<String, CreatableTopic> internalTopicsToBeCreated,
Optional<TopicConfigurationException> topicConfigurationException) {

public ConfiguredTopology {
if (topologyEpoch < 0) {
throw new IllegalArgumentException("Topology epoch must be non-negative.");
}
Objects.requireNonNull(subtopologies, "subtopologies can't be null");
Objects.requireNonNull(internalTopicsToBeCreated, "internalTopicsToBeCreated can't be null");
Objects.requireNonNull(topicConfigurationException, "topicConfigurationException can't be null");
}

public boolean isReady() {
return topicConfigurationException.isEmpty();
}

public StreamsGroupDescribeResponseData.Topology asStreamsGroupDescribeTopology() {
return new StreamsGroupDescribeResponseData.Topology()
.setEpoch(topologyEpoch)
.setSubtopologies(subtopologies.entrySet().stream().map(
entry -> entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey())
).collect(Collectors.toList()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;

public class TopicConfigurationException extends RuntimeException {

private final Status status;

public TopicConfigurationException(StreamsGroupHeartbeatResponse.Status status, String message) {
super(message);
this.status = status;
}

public Status status() {
return status;
}

public static TopicConfigurationException incorrectlyPartitionedTopics(String message) {
return new TopicConfigurationException(Status.INCORRECTLY_PARTITIONED_TOPICS, message);
}

public static TopicConfigurationException missingSourceTopics(String message) {
return new TopicConfigurationException(Status.MISSING_SOURCE_TOPICS, message);
}

public static TopicConfigurationException missingInternalTopics(String message) {
return new TopicConfigurationException(Status.MISSING_INTERNAL_TOPICS, message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;

import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class ConfiguredInternalTopicTest {

@Test
public void testConstructorWithNullName() {
assertThrows(NullPointerException.class,
() -> new ConfiguredInternalTopic(null, 1, Optional.empty(), Collections.emptyMap()));
}

@Test
public void testConstructorWithInvalidName() {
assertThrows(InvalidTopicException.class,
() -> new ConfiguredInternalTopic("invalid topic name", 1, Optional.empty(), Collections.emptyMap()));
}

@Test
public void testConstructorWithNullTopicConfigs() {
assertThrows(NullPointerException.class,
() -> new ConfiguredInternalTopic("test-topic", 1, Optional.empty(), null));
}

@Test
public void testConstructorWithZeroPartitions() {
assertThrows(IllegalArgumentException.class,
() -> new ConfiguredInternalTopic("test-topic", 0, Optional.empty(), Collections.emptyMap()));
}

@Test
public void testAsStreamsGroupDescribeTopicInfo() {
String topicName = "test-topic";
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put("retention.ms", "1000");
int numberOfPartitions = 3;
Optional<Short> replicationFactor = Optional.of((short) 2);
ConfiguredInternalTopic configuredInternalTopic = new ConfiguredInternalTopic(
topicName, numberOfPartitions, replicationFactor, topicConfigs);

StreamsGroupDescribeResponseData.TopicInfo topicInfo = configuredInternalTopic.asStreamsGroupDescribeTopicInfo();

assertEquals(topicName, topicInfo.name());
assertEquals(numberOfPartitions, topicInfo.partitions());
assertEquals(replicationFactor.orElse((short) 0).shortValue(), topicInfo.replicationFactor());
assertEquals(1, topicInfo.topicConfigs().size());
assertEquals("1000", topicInfo.topicConfigs().get(0).value());
}
}
Loading

0 comments on commit 35489bf

Please sign in to comment.