Skip to content

Commit

Permalink
KAFKA-18361 Remove PasswordEncoderConfigs (#18347)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
frankvicky authored Dec 30, 2024
1 parent af7f403 commit 3161115
Showing 9 changed files with 8 additions and 301 deletions.
36 changes: 2 additions & 34 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
@@ -213,11 +213,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val lock = new ReentrantReadWriteLock
private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _
private var currentConfig: KafkaConfig = _
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
Some(PasswordEncoder.NOOP)
}
private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP)

private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false)
@@ -373,16 +369,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
})
}

private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
secret.map { secret =>
PasswordEncoder.encrypting(secret,
kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
kafkaConfig.passwordEncoderCipherAlgorithm,
kafkaConfig.passwordEncoderKeyLength,
kafkaConfig.passwordEncoderIterations)
}
}

private def passwordEncoder: PasswordEncoder = {
dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured"))
}
@@ -446,25 +432,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
// encoded using the current secret. Ignore any errors during decoding since old secret may not
// have been removed during broker restart.
private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
val props = persistentProps.clone().asInstanceOf[Properties]
if (props.asScala.keySet.exists(isPasswordConfig)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
persistentProps.asScala.foreachEntry { (configName, value) =>
if (isPasswordConfig(configName) && value != null) {
val decoded = try {
Some(passwordDecoder.decode(value).value)
} catch {
case _: Exception =>
debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.")
None
}
decoded.foreach(value => props.put(configName, passwordEncoder.encode(new Password(value))))
}
}
adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props)
}
}
props
persistentProps.clone().asInstanceOf[Properties]
}

/**
9 changes: 0 additions & 9 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
@@ -40,7 +40,6 @@ import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, Transacti
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
@@ -591,14 +590,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val delegationTokenExpiryTimeMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG)
val delegationTokenExpiryCheckIntervalMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG)

/** ********* Password encryption configuration for dynamic configs *********/
def passwordEncoderSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
def passwordEncoderOldSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG))
def passwordEncoderCipherAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG)
def passwordEncoderKeyFactoryAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG)
def passwordEncoderKeyLength = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG)
def passwordEncoderIterations = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG)

/** ********* Fetch Configuration **************/
val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG)
val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG)
Original file line number Diff line number Diff line change
@@ -61,7 +61,6 @@
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
@@ -182,7 +181,6 @@ public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception {
configs.put("listener.name.external.ssl.keystore.password", "secret");
configs.put("log.cleaner.threads", "2");
// Password encoder configs
configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");

// Password config update at default cluster-level should fail
assertThrows(ExecutionException.class,
Original file line number Diff line number Diff line change
@@ -27,13 +27,11 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.utils.TestUtils
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SaslConfigs, SslConfigs}
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -369,7 +367,6 @@ class DynamicBrokerConfigTest {

private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = {
val configProps = TestUtils.createBrokerConfig(0, null, port = 8181)
configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret")
val config = KafkaConfig(configProps)
config.dynamicConfig.initialize(None, None)

@@ -416,61 +413,6 @@ class DynamicBrokerConfigTest {
}
}

@Test
def testPasswordConfigNotEncryption(): Unit = {
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
val configWithoutSecret = KafkaConfig(props)
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val configWithSecret = KafkaConfig(props)
val dynamicProps = new Properties
val password = "myLoginModule required;"
dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password)

try {
configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
} catch {
case _: ConfigException => // expected exception
}
val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
}

@Test
def testPasswordConfigEncoderSecretChange(): Unit = {
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.put(SaslConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val config = KafkaConfig(props)
config.dynamicConfig.initialize(None, None)
val dynamicProps = new Properties
val password = "dynamicLoginModule required;"
dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password)

val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
config.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals(password, config.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

// New config with same secret should use the dynamic password config
val newConfigWithSameSecret = KafkaConfig(props)
newConfigWithSameSecret.dynamicConfig.initialize(None, None)
newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

// New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret")
val newConfigWithNewAndOldSecret = KafkaConfig(props)
newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

// New config with new secret alone should revert to static password config since dynamic config cannot be decoded
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret")
val newConfigWithNewSecret = KafkaConfig(props)
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
}

@Test
def testDynamicListenerConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, null, port = 9092)
9 changes: 0 additions & 9 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs}
@@ -1005,14 +1004,6 @@ class KafkaConfigTest {
// Security config
case SecurityConfig.SECURITY_PROVIDERS_CONFIG =>

// Password encoder configs
case PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG =>
case PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG =>
case PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG =>
case PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG =>
case PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
case PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")

//delegation token configs
case DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG => // ignore
case DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
4 changes: 4 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
@@ -63,6 +63,10 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
</li>
<li>The <code>log.message.format.version</code> and <code>message.format.version</code> configs were removed.
</li>
<li>The password encoder related configs (<code>password.encoder.secret</code>, <code>password.encoder.old.secret</code>,
<code>password.encoder.keyfactory.algorithm</code>, <code>password.encoder.cipher.algorithm</code>,
<code>password.encoder.key.length</code>, and <code>password.encoder.iterations</code>) were removed.
</li>
<li>The function <code>onNewBatch</code> in <code>org.apache.kafka.clients.producer.Partitioner</code> class was removed.
</li>
</ul>

This file was deleted.

Loading

0 comments on commit 3161115

Please sign in to comment.