From 1f56f79c931e6b8a75a18ac1a94ef7c567e8079d Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Mon, 13 Sep 2021 10:13:41 -0700 Subject: [PATCH] KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals (#11312) Some plugins make use of KafkaConfig#originals rather than the KafkaConfig object. We should ensure that these plugins see the correct value for broker.id if the broker is running in KRaft mode and node.id has been configured, but not broker.id. This PR does this by ensuring that both node.id and broker.id are set in the originals map if either one is set. We also check that they are set to the same value in KafkaConfig#validateValues. Co-author: Ron Dagostino --- .../main/scala/kafka/server/KafkaConfig.scala | 38 ++++++---- .../unit/kafka/server/KafkaConfigTest.scala | 72 +++++++++++++++++++ 2 files changed, 98 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5dc071d51d2e4..ad9f577138e1f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1385,13 +1385,31 @@ object KafkaConfig { } if (maybeSensitive) Password.HIDDEN else value } + + /** + * Copy a configuration map, populating some keys that we want to treat as synonyms. + */ + def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = { + val output = new util.HashMap[Any, Any](input) + val brokerId = output.get(KafkaConfig.BrokerIdProp) + val nodeId = output.get(KafkaConfig.NodeIdProp) + if (brokerId == null && nodeId != null) { + output.put(KafkaConfig.BrokerIdProp, nodeId) + } else if (brokerId != null && nodeId == null) { + output.put(KafkaConfig.NodeIdProp, brokerId) + } + output + } } -class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig]) +class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynamicConfigOverride: Option[DynamicBrokerConfig]) extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging { - def this(props: java.util.Map[_, _]) = this(props, true, None) - def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None) + def this(props: java.util.Map[_, _]) = this(true, KafkaConfig.populateSynonyms(props), None) + def this(props: java.util.Map[_, _], doLog: Boolean) = this(doLog, KafkaConfig.populateSynonyms(props), None) + def this(props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig]) = + this(doLog, KafkaConfig.populateSynonyms(props), dynamicConfigOverride) + // Cache the current config to avoid acquiring read lock to access from dynamicConfig @volatile private var currentConfig = this private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this)) @@ -1512,15 +1530,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO /** ********* General Configuration ***********/ val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp) val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) - var brokerId: Int = { - val nodeId = getInt(KafkaConfig.NodeIdProp) - if (nodeId < 0) { - getInt(KafkaConfig.BrokerIdProp) - } else { - nodeId - } - } - val nodeId: Int = brokerId + var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) + val nodeId: Int = getInt(KafkaConfig.NodeIdProp) val processRoles: Set[ProcessRole] = parseProcessRoles() val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp) val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp) @@ -1901,6 +1912,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @nowarn("cat=deprecation") private def validateValues(): Unit = { + if (nodeId != brokerId) { + throw new ConfigException(s"You must set `${KafkaConfig.NodeIdProp}` to the same value as `${KafkaConfig.BrokerIdProp}`.") + } if (requiresZookeeper) { if (zkConnect == null) { throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.") diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2e38df00f11af..e94a4891f1f1b 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1235,4 +1235,76 @@ class KafkaConfigTest { assertEquals(dataDir1, config.metadataLogDir) assertEquals(Seq(dataDir1, dataDir2), config.logDirs) } + + @Test + def testPopulateSynonymsOnEmptyMap(): Unit = { + assertEquals(Collections.emptyMap(), KafkaConfig.populateSynonyms(Collections.emptyMap())) + } + + @Test + def testPopulateSynonymsOnMapWithoutNodeId(): Unit = { + val input = new util.HashMap[String, String]() + input.put(KafkaConfig.BrokerIdProp, "4") + val expectedOutput = new util.HashMap[String, String]() + expectedOutput.put(KafkaConfig.BrokerIdProp, "4") + expectedOutput.put(KafkaConfig.NodeIdProp, "4") + assertEquals(expectedOutput, KafkaConfig.populateSynonyms(input)) + } + + @Test + def testPopulateSynonymsOnMapWithoutBrokerId(): Unit = { + val input = new util.HashMap[String, String]() + input.put(KafkaConfig.NodeIdProp, "4") + val expectedOutput = new util.HashMap[String, String]() + expectedOutput.put(KafkaConfig.BrokerIdProp, "4") + expectedOutput.put(KafkaConfig.NodeIdProp, "4") + assertEquals(expectedOutput, KafkaConfig.populateSynonyms(input)) + } + + @Test + def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = { + val props = new Properties() + props.setProperty(KafkaConfig.BrokerIdProp, "1") + props.setProperty(KafkaConfig.NodeIdProp, "2") + assertEquals("You must set `node.id` to the same value as `broker.id`.", + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage()) + } + + @Test + def testNodeIdOrBrokerIdMustBeSetWithKraft(): Unit = { + val props = new Properties() + props.setProperty(KafkaConfig.ProcessRolesProp, "broker") + props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093") + assertEquals("Missing configuration `node.id` which is required when `process.roles` " + + "is defined (i.e. when running in KRaft mode).", + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage()) + } + + @Test + def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = { + val props = new Properties() + props.setProperty(KafkaConfig.ProcessRolesProp, "broker") + props.setProperty(KafkaConfig.BrokerIdProp, "3") + props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093") + val config = KafkaConfig.fromProps(props) + assertEquals(3, config.brokerId) + assertEquals(3, config.nodeId) + val originals = config.originals() + assertEquals("3", originals.get(KafkaConfig.BrokerIdProp)) + assertEquals("3", originals.get(KafkaConfig.NodeIdProp)) + } + + @Test + def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = { + val props = new Properties() + props.setProperty(KafkaConfig.ProcessRolesProp, "broker") + props.setProperty(KafkaConfig.NodeIdProp, "3") + props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093") + val config = KafkaConfig.fromProps(props) + assertEquals(3, config.brokerId) + assertEquals(3, config.nodeId) + val originals = config.originals() + assertEquals("3", originals.get(KafkaConfig.BrokerIdProp)) + assertEquals("3", originals.get(KafkaConfig.NodeIdProp)) + } }