Skip to content

Commit

Permalink
KAFKA-17329: DefaultStatePersister implementation (apache#17270)
Browse files Browse the repository at this point in the history
Adds the DefaultStatePersister and other supporting classes for managing share state.

* Added DefaultStatePersister implementation. This is the entry point for callers who wish to invoke the share state RPC API.
* Added PersisterStateManager which is used by DefaultStatePersister to manage and send the RPCs over the network.
* Added code to BrokerServer and BrokerMetadataPublisher to instantiate the appropriate persister based on the config value for group.share.persister.class.name. If this is not specified, the DefaultStatePersister will be used. To force use of NoOpStatePersister, set the config to empty. This is an internal config, not to be exposed to the end user. This will be used to factory plug the appropriate persister.
* Using this persister, the internal __share_group_state topic will come to life and will be used for persistence of share group info.

Reviewers: Andrew Schofield <[email protected]>, Jun Rao <[email protected]>, David Arthur <[email protected]>
  • Loading branch information
smjn authored and chiacyu committed Nov 30, 2024
1 parent d90b58d commit 8f3f516
Show file tree
Hide file tree
Showing 24 changed files with 4,898 additions and 310 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -995,9 +995,13 @@ project(':share') {
dependencies {
implementation project(':server-common')

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output

implementation libs.slf4jApi

testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.slf4jReload4j

testRuntimeOnly libs.junitPlatformLanucher
Expand Down
8 changes: 8 additions & 0 deletions checkstyle/import-control-share.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<allow pkg="java" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.mockito" />

<!-- no one depends on the server -->
<disallow pkg="kafka" />
Expand All @@ -47,6 +48,13 @@
<subpackage name="fetch">
<allow class="org.apache.kafka.server.storage.log.FetchParams"/>
</subpackage>

<subpackage name="persister">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
</subpackage>
</subpackage>

Expand Down
105 changes: 73 additions & 32 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher, ShareCoordinatorMetadataCacheHelperImpl}
import kafka.server.share.SharePartitionManager
import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
Expand All @@ -46,7 +46,7 @@ import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetric
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.share.persister.{NoOpShareStatePersister, Persister}
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpShareStatePersister, Persister, PersisterStateManager}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
Expand Down Expand Up @@ -124,7 +124,7 @@ class BrokerServer(

var transactionCoordinator: TransactionCoordinator = _

var shareCoordinator: Option[ShareCoordinator] = _
var shareCoordinator: Option[ShareCoordinator] = None

var clientToControllerChannelManager: NodeToControllerChannelManager = _

Expand Down Expand Up @@ -356,8 +356,12 @@ class BrokerServer(
/* initializing the groupConfigManager */
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))

/* create share coordinator */
shareCoordinator = createShareCoordinator()

/* create persister */
persister = createShareStatePersister()

groupCoordinator = createGroupCoordinator()

val producerIdManagerSupplier = () => ProducerIdManager.rpc(
Expand Down Expand Up @@ -423,8 +427,6 @@ class BrokerServer(
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)

persister = NoOpShareStatePersister.getInstance()

sharePartitionManager = new SharePartitionManager(
replicaManager,
time,
Expand All @@ -435,7 +437,7 @@ class BrokerServer(
config.shareGroupConfig.shareFetchMaxFetchRecords,
persister,
groupConfigManager,
new Metrics()
metrics
)

// Create the request processor objects.
Expand Down Expand Up @@ -646,33 +648,68 @@ class BrokerServer(
}

private def createShareCoordinator(): Option[ShareCoordinator] = {
if (!config.shareGroupConfig.isShareGroupEnabled) {
return None
if (config.shareGroupConfig.isShareGroupEnabled &&
config.shareGroupConfig.shareGroupPersisterClassName().nonEmpty) {
val time = Time.SYSTEM
val timer = new SystemTimerReaper(
"share-coordinator-reaper",
new SystemTimer("share-coordinator")
)

val serde = new ShareCoordinatorRecordSerde
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
time,
replicaManager,
serde,
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
)
val writer = new CoordinatorPartitionWriter(
replicaManager
)
Some(new ShareCoordinatorService.Builder(config.brokerId, config.shareCoordinatorConfig)
.withTimer(timer)
.withTime(time)
.withLoader(loader)
.withWriter(writer)
.withCoordinatorRuntimeMetrics(new ShareCoordinatorRuntimeMetrics(metrics))
.withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics))
.build())
} else {
None
}
}

private def createShareStatePersister(): Persister = {
if (config.shareGroupConfig.isShareGroupEnabled &&
config.shareGroupConfig.shareGroupPersisterClassName.nonEmpty) {
val klass = Utils.loadClass(config.shareGroupConfig.shareGroupPersisterClassName, classOf[Object]).asInstanceOf[Class[Persister]]

if (klass.getName.equals(classOf[DefaultStatePersister].getName)) {
klass.getConstructor(classOf[PersisterStateManager])
.newInstance(
new PersisterStateManager(
NetworkUtils.buildNetworkClient("Persister", config, metrics, Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.get.partitionFor(key), config.interBrokerListenerName),
Time.SYSTEM,
new SystemTimerReaper(
"persister-state-manager-reaper",
new SystemTimer("persister")
)
)
)
} else if (klass.getName.equals(classOf[NoOpShareStatePersister].getName)) {
info("Using no op persister")
new NoOpShareStatePersister()
} else {
error("Unknown persister specified. Persister is only factory pluggable!")
throw new IllegalArgumentException("Unknown persiser specified " + config.shareGroupConfig.shareGroupPersisterClassName)
}
} else {
// in case share coordinator not enabled or
// persister class name deliberately empty (key=)
info("Using no op persister")
new NoOpShareStatePersister()
}
val time = Time.SYSTEM
val timer = new SystemTimerReaper(
"share-coordinator-reaper",
new SystemTimer("share-coordinator")
)

val serde = new ShareCoordinatorRecordSerde
val loader = new CoordinatorLoaderImpl[CoordinatorRecord](
time,
replicaManager,
serde,
config.shareCoordinatorConfig.shareCoordinatorLoadBufferSize()
)
val writer = new CoordinatorPartitionWriter(
replicaManager
)
Some(new ShareCoordinatorService.Builder(config.brokerId, config.shareCoordinatorConfig)
.withTimer(timer)
.withTime(time)
.withLoader(loader)
.withWriter(writer)
.withCoordinatorRuntimeMetrics(new ShareCoordinatorRuntimeMetrics(metrics))
.withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics))
.build())
}

protected def createRemoteLogManager(): Option[RemoteLogManager] = {
Expand Down Expand Up @@ -778,9 +815,13 @@ class BrokerServer(

if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)

Utils.closeQuietly(brokerTopicStats, "broker topic stats")
Utils.closeQuietly(sharePartitionManager, "share partition manager")

if (persister != null)
CoreUtils.swallow(persister.stop(), this)

isShuttingDown.set(false)

CoreUtils.swallow(lifecycleManager.close(), this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ class BrokerMetadataPublisher(
s"coordinator with local changes in $deltaName", t)
}

try {
// Propagate the new image to the share coordinator.
shareCoordinator.foreach(coordinator => coordinator.onNewMetadataImage(newImage, delta))
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating share " +
s"coordinator with local changes in $deltaName", t)
}

if (_firstPublish) {
finishInitializingReplicaManager()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server.metadata;

import kafka.server.MetadataCache;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import scala.jdk.javaapi.CollectionConverters;
import scala.jdk.javaapi.OptionConverters;

public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinatorMetadataCacheHelper {
private final MetadataCache metadataCache;
private final Function<String, Integer> keyToPartitionMapper;
private final ListenerName interBrokerListenerName;

public ShareCoordinatorMetadataCacheHelperImpl(
MetadataCache metadataCache,
Function<String, Integer> keyToPartitionMapper,
ListenerName interBrokerListenerName
) {
Objects.requireNonNull(metadataCache, "metadataCache must not be null");
Objects.requireNonNull(keyToPartitionMapper, "keyToPartitionMapper must not be null");
Objects.requireNonNull(interBrokerListenerName, "interBrokerListenerName must not be null");

this.metadataCache = metadataCache;
this.keyToPartitionMapper = keyToPartitionMapper;
this.interBrokerListenerName = interBrokerListenerName;
}

@Override
public boolean containsTopic(String topic) {
return metadataCache.contains(topic);
}

@Override
public Node getShareCoordinator(String key, String internalTopicName) {
if (metadataCache.contains(internalTopicName)) {
Set<String> topicSet = new HashSet<>();
topicSet.add(internalTopicName);

List<MetadataResponseData.MetadataResponseTopic> topicMetadata = CollectionConverters.asJava(
metadataCache.getTopicMetadata(
CollectionConverters.asScala(topicSet),
interBrokerListenerName,
false,
false
)
);

if (topicMetadata == null || topicMetadata.isEmpty() || topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
return Node.noNode();
} else {
int partition = keyToPartitionMapper.apply(key);
Optional<MetadataResponseData.MetadataResponsePartition> response = topicMetadata.get(0).partitions().stream()
.filter(responsePart -> responsePart.partitionIndex() == partition
&& responsePart.leaderId() != MetadataResponse.NO_LEADER_ID)
.findFirst();

if (response.isPresent()) {
return OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName))
.orElse(Node.noNode());
} else {
return Node.noNode();
}
}
}
return Node.noNode();
}

@Override
public List<Node> getClusterNodes() {
return CollectionConverters.asJava(metadataCache.getAliveBrokerNodes(interBrokerListenerName).toSeq());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ public void testCloseSharePartitionManager() throws Exception {
Timer timer = Mockito.mock(SystemTimerReaper.class);
Persister persister = Mockito.mock(Persister.class);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withTimer(timer).withShareGroupPersister(persister).build();
.withTimer(timer).withShareGroupPersister(persister).build();

// Verify that 0 calls are made to timer.close() and persister.stop().
Mockito.verify(timer, times(0)).close();
Expand Down Expand Up @@ -2270,7 +2270,7 @@ static class SharePartitionManagerBuilder {
private Time time = new MockTime();
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
private Persister persister = NoOpShareStatePersister.getInstance();
private Persister persister = new NoOpShareStatePersister();
private Timer timer = new MockTimer();
private Metrics metrics = new Metrics();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5127,7 +5127,8 @@ private static class SharePartitionBuilder {
private int defaultAcquisitionLockTimeoutMs = 30000;
private int maxDeliveryCount = MAX_DELIVERY_COUNT;
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
private Persister persister = NoOpShareStatePersister.getInstance();

private Persister persister = new NoOpShareStatePersister();
private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);

Expand Down
Loading

0 comments on commit 8f3f516

Please sign in to comment.