Skip to content

Commit

Permalink
KC-1310: Remove enable.metadata.quorum from the kip-500 branch
Browse files Browse the repository at this point in the history
  • Loading branch information
cmccabe committed Jan 19, 2021
1 parent 3522a2b commit 25716aa
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 29 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class KafkaApis(val requestChannel: RequestChannel,
apisUtils.sendErrorResponseMaybeThrottle(envelope, error.exception)
}

if (!config.metadataQuorumEnabled || !envelope.context.fromPrivilegedListener) {
if (!config.quorumControlPlaneEnabled || !envelope.context.fromPrivilegedListener) {
// If the designated forwarding request is not coming from a privileged listener, or
// forwarding is not enabled yet, we would not handle the request.
requestChannel.closeConnection(envelope, Collections.emptyMap())
Expand All @@ -164,7 +164,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}

private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
(config.processRoles.nonEmpty || config.metadataQuorumEnabled) &&
config.quorumControlPlaneEnabled &&
request.context.principalSerde.isPresent &&
request.header.apiKey().forwardable
}
Expand Down Expand Up @@ -1372,7 +1372,7 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","),
brokers.mkString(","), request.header.correlationId, request.header.clientId))

val controllerId = if (config.processRoles.nonEmpty) {
val controllerId = if (config.quorumControlPlaneEnabled) {
// When running in KIP-500 mode, we send back a random controller ID, reflecting the
// fact that requests for the controller can be sent to any node.
// TODO: new clients could be smarter about this and understand when it is and is
Expand Down
8 changes: 1 addition & 7 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ object KafkaConfig {
val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
val EnableMetadataQuorumProp = "enable.metadata.quorum"
val ProcessRolesProp = "process.roles"
val ControllerQuorumVotersProp = "controller.quorum.voters"
val InitialBrokerRegistrationTimeoutMs = "initial.broker.registration.timeout.ms"
Expand Down Expand Up @@ -1062,9 +1061,6 @@ object KafkaConfig {
.define(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, MEDIUM, BrokerSessionTimeoutMsDoc)
.define(MetadataLogDirProp, STRING, null, HIGH, MetadataLogDirDoc)

// Experimental flag to turn on APIs required for the internal metadata quorum (KIP-500)
.defineInternal(EnableMetadataQuorumProp, BOOLEAN, false, LOW)

/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)

Expand Down Expand Up @@ -1520,6 +1516,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val connectionSetupTimeoutMs = getLong(KafkaConfig.ConnectionSetupTimeoutMsProp)
val connectionSetupTimeoutMaxMs = getLong(KafkaConfig.ConnectionSetupTimeoutMaxMsProp)
val processRoles = parseProcessRoles()
val quorumControlPlaneEnabled = processRoles.nonEmpty
val controllerQuorumVoters = getString(KafkaConfig.ControllerQuorumVotersProp)
val initialRegistrationTimeoutMs = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMs)
val brokerHeartbeatIntervalMs = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
Expand Down Expand Up @@ -1652,9 +1649,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/** ********* Feature configuration ***********/
def isFeatureVersioningSupported = interBrokerProtocolVersion >= KAFKA_2_7_IV0

/** ********* Experimental metadata quorum configuration ***********/
def metadataQuorumEnabled = getBoolean(KafkaConfig.EnableMetadataQuorumProp)

/** ********* Group coordinator configuration ***********/
val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/Kip500Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class Kip500Broker(
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time, credentialProvider, Some(config.brokerId),
Some(new LogContext(s"[SocketServer brokerId=${config.controllerId}] ")))
Some(new LogContext(s"[SocketServer brokerId=${config.controllerId}] ")), allowDisabledApis = true)
socketServer.startup(startProcessingRequests = false)

// Create replica manager.
Expand Down
16 changes: 7 additions & 9 deletions core/src/main/scala/kafka/server/LegacyBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class LegacyBroker(val config: KafkaConfig,
// Note that we allow the use of disabled APIs when experimental support for
// the internal metadata quorum has been enabled
socketServer = new SocketServer(config, metrics, time, credentialProvider,
allowDisabledApis = config.metadataQuorumEnabled)
allowDisabledApis = true)
socketServer.startup(startProcessingRequests = false)

/* start replica manager */
Expand All @@ -286,14 +286,12 @@ class LegacyBroker(val config: KafkaConfig,
kafkaController.startup()

var forwardingManager: ForwardingManager = null
if (config.metadataQuorumEnabled) {
/* start forwarding manager */
forwardingChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
time, metrics, config, 60000, "forwarding", threadNamePrefix)
forwardingChannelManager.start()
forwardingManager = new ForwardingManager(forwardingChannelManager, time,
config.requestTimeoutMs.longValue(), logContext)
}
/* start forwarding manager */
forwardingChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
time, metrics, config, 60000, "forwarding", threadNamePrefix)
forwardingChannelManager.start()
forwardingManager = new ForwardingManager(forwardingChannelManager, time,
config.requestTimeoutMs.longValue(), logContext)

adminManager = new LegacyAdminManager(config, metrics, metadataCache, zkClient)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,13 @@

package kafka.server

import java.util.Properties

import org.apache.kafka.common.protocol.Errors
import org.junit.Assert.assertEquals
import org.junit.Test

import scala.jdk.CollectionConverters._

class CreateTopicsRequestWithForwardingTest extends AbstractCreateTopicsRequestTest {

override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString)
}

@Test
def testForwardToController(): Unit = {
val req = topicsReq(Seq(topicReq("topic1")))
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class KafkaApisTest {
val properties = TestUtils.createBrokerConfig(brokerId, "zk")
properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString)
properties.put(KafkaConfig.EnableMetadataQuorumProp, enableForwarding.toString)
new KafkaApis(requestChannel,
replicaManager,
adminManager,
Expand Down Expand Up @@ -415,7 +414,7 @@ class KafkaApisTest {
results.foreach(r => {
assertEquals(Errors.NONE.code, r.errorCode)
assertFalse("Should return configs", r.configs.isEmpty)
r.configs.asScala.filter { c => c.name != KafkaConfig.EnableMetadataQuorumProp }.foreach(c => {
r.configs.asScala.foreach(c => {
assertNotNull(s"Config ${c.name} should have non null documentation", c.documentation)
assertNotEquals(s"Config ${c.name} should have non blank documentation", "", c.documentation.trim)
})
Expand Down

0 comments on commit 25716aa

Please sign in to comment.