Skip to content

Commit

Permalink
KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals (a…
Browse files Browse the repository at this point in the history
…pache#11312)

Some plugins make use of KafkaConfig#originals rather than the
KafkaConfig object. We should ensure that these plugins see the
correct value for broker.id if the broker is running in KRaft mode and
node.id has been configured, but not broker.id.

This PR does this by ensuring that both node.id and broker.id are set in
the originals map if either one is set.  We also check that they are set
to the same value in KafkaConfig#validateValues.

Co-author: Ron Dagostino <[email protected]>
  • Loading branch information
cmccabe authored and Ralph Debusmann committed Dec 22, 2021
1 parent 690d702 commit 1f56f79
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 12 deletions.
38 changes: 26 additions & 12 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1385,13 +1385,31 @@ object KafkaConfig {
}
if (maybeSensitive) Password.HIDDEN else value
}

/**
* Copy a configuration map, populating some keys that we want to treat as synonyms.
*/
def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
val output = new util.HashMap[Any, Any](input)
val brokerId = output.get(KafkaConfig.BrokerIdProp)
val nodeId = output.get(KafkaConfig.NodeIdProp)
if (brokerId == null && nodeId != null) {
output.put(KafkaConfig.BrokerIdProp, nodeId)
} else if (brokerId != null && nodeId == null) {
output.put(KafkaConfig.NodeIdProp, brokerId)
}
output
}
}

class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig])
class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynamicConfigOverride: Option[DynamicBrokerConfig])
extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging {

def this(props: java.util.Map[_, _]) = this(props, true, None)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None)
def this(props: java.util.Map[_, _]) = this(true, KafkaConfig.populateSynonyms(props), None)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(doLog, KafkaConfig.populateSynonyms(props), None)
def this(props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig]) =
this(doLog, KafkaConfig.populateSynonyms(props), dynamicConfigOverride)

// Cache the current config to avoid acquiring read lock to access from dynamicConfig
@volatile private var currentConfig = this
private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
Expand Down Expand Up @@ -1512,15 +1530,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = {
val nodeId = getInt(KafkaConfig.NodeIdProp)
if (nodeId < 0) {
getInt(KafkaConfig.BrokerIdProp)
} else {
nodeId
}
}
val nodeId: Int = brokerId
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
val processRoles: Set[ProcessRole] = parseProcessRoles()
val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
Expand Down Expand Up @@ -1901,6 +1912,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO

@nowarn("cat=deprecation")
private def validateValues(): Unit = {
if (nodeId != brokerId) {
throw new ConfigException(s"You must set `${KafkaConfig.NodeIdProp}` to the same value as `${KafkaConfig.BrokerIdProp}`.")
}
if (requiresZookeeper) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1235,4 +1235,76 @@ class KafkaConfigTest {
assertEquals(dataDir1, config.metadataLogDir)
assertEquals(Seq(dataDir1, dataDir2), config.logDirs)
}

@Test
def testPopulateSynonymsOnEmptyMap(): Unit = {
assertEquals(Collections.emptyMap(), KafkaConfig.populateSynonyms(Collections.emptyMap()))
}

@Test
def testPopulateSynonymsOnMapWithoutNodeId(): Unit = {
val input = new util.HashMap[String, String]()
input.put(KafkaConfig.BrokerIdProp, "4")
val expectedOutput = new util.HashMap[String, String]()
expectedOutput.put(KafkaConfig.BrokerIdProp, "4")
expectedOutput.put(KafkaConfig.NodeIdProp, "4")
assertEquals(expectedOutput, KafkaConfig.populateSynonyms(input))
}

@Test
def testPopulateSynonymsOnMapWithoutBrokerId(): Unit = {
val input = new util.HashMap[String, String]()
input.put(KafkaConfig.NodeIdProp, "4")
val expectedOutput = new util.HashMap[String, String]()
expectedOutput.put(KafkaConfig.BrokerIdProp, "4")
expectedOutput.put(KafkaConfig.NodeIdProp, "4")
assertEquals(expectedOutput, KafkaConfig.populateSynonyms(input))
}

@Test
def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.NodeIdProp, "2")
assertEquals("You must set `node.id` to the same value as `broker.id`.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage())
}

@Test
def testNodeIdOrBrokerIdMustBeSetWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertEquals("Missing configuration `node.id` which is required when `process.roles` " +
"is defined (i.e. when running in KRaft mode).",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage())
}

@Test
def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.BrokerIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
val config = KafkaConfig.fromProps(props)
assertEquals(3, config.brokerId)
assertEquals(3, config.nodeId)
val originals = config.originals()
assertEquals("3", originals.get(KafkaConfig.BrokerIdProp))
assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
}

@Test
def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.NodeIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
val config = KafkaConfig.fromProps(props)
assertEquals(3, config.brokerId)
assertEquals(3, config.nodeId)
val originals = config.originals()
assertEquals("3", originals.get(KafkaConfig.BrokerIdProp))
assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
}
}

0 comments on commit 1f56f79

Please sign in to comment.