From e035f7036d52cb0c1c3a75209ba831c330011b2f Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Thu, 14 Nov 2024 15:12:11 -0800 Subject: [PATCH] MINOR: convert SaslClientsWithInvalidCredentialsTest + MultipleListenersWithSameSecurityProtocolBaseTest to KRaft (#17803) Reviewers: Justine Olshan --- ...aslClientsWithInvalidCredentialsTest.scala | 37 +++++++++++------- ...nersWithSameSecurityProtocolBaseTest.scala | 39 +++++++++++++------ 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index 1a4451c2b0a44..af3f030648fad 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -23,17 +23,18 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.SaslAuthenticationException -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.api.Assertions._ import kafka.utils.{TestInfoUtils, TestUtils} -import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig +import org.apache.kafka.metadata.storage.Formatter import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{MethodSource, ValueSource} import scala.jdk.javaapi.OptionConverters +import scala.util.Using class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { @@ -57,9 +58,11 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = { super.configureSecurityBeforeServersStart(testInfo) - zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) - // Create broker credentials before starting brokers - createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD) + } + + override def addFormatterSettings(formatter: Formatter): Unit = { + formatter.setScramArguments( + List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava) } override def createPrivilegedAdminClient() = { @@ -72,7 +75,11 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME)) super.setUp(testInfo) - createTopic(topic, numPartitions, brokerCount) + Using(createPrivilegedAdminClient()) { superuserAdminClient => + TestUtils.createTopicWithAdmin( + superuserAdminClient, topic, brokers, controllerServers, numPartitions + ) + } } @AfterEach @@ -81,7 +88,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { closeSasl() } - @ParameterizedTest + @ParameterizedTest(name="{displayName}.quorum=kraft.isIdempotenceEnabled={0}") @ValueSource(booleans = Array(true, false)) def testProducerWithAuthenticationFailure(isIdempotenceEnabled: Boolean): Unit = { val prop = new Properties() @@ -101,8 +108,9 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { verifyWithRetry(sendOneRecord(producer2)) } - @Test - def testTransactionalProducerWithAuthenticationFailure(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testTransactionalProducerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { val txProducer = createTransactionalProducer() verifyAuthenticationException(txProducer.initTransactions()) @@ -111,7 +119,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() consumer.subscribe(List(topic).asJava) @@ -119,7 +127,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() consumer.assign(List(tp).asJava) @@ -127,7 +135,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) def testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString) val consumer = createConsumer() @@ -146,8 +154,9 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count)) } - @Test - def testKafkaAdminClientWithAuthenticationFailure(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testKafkaAdminClientWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = { val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, OptionConverters.toJava(trustStoreFile), OptionConverters.toJava(clientSaslProperties)) props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) val adminClient = Admin.create(props) diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala index 16d1454604ecc..ec44af7b2364d 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -25,13 +25,14 @@ import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils.JaasSection import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.Implicits._ +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewTopic} import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.{SaslConfigs, SslConfigs} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.{ConnectionMode, ListenerName} -import org.apache.kafka.server.config.{ReplicationConfigs, ZkConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.network.SocketServerConfigs import org.junit.jupiter.api.Assertions.assertEquals @@ -57,7 +58,8 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT import MultipleListenersWithSameSecurityProtocolBaseTest._ private val trustStoreFile = TestUtils.tempFile("truststore", ".jks") - private val servers = new ArrayBuffer[KafkaServer] + private val servers = new ArrayBuffer[KafkaBroker] + private var admin: Admin = null private val producers = mutable.Map[ClientMetadata, KafkaProducer[Array[Byte], Array[Byte]]]() private val consumers = mutable.Map[ClientMetadata, Consumer[Array[Byte], Array[Byte]]]() @@ -78,14 +80,15 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT (0 until numServers).foreach { brokerId => - val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile)) + val props = TestUtils.createBrokerConfig(brokerId, null, trustStoreFile = Some(trustStoreFile)) // Ensure that we can support multiple listeners per security protocol and multiple security protocols props.put(SocketServerConfigs.LISTENERS_CONFIG, s"$SecureInternal://localhost:0, $Internal://localhost:0, " + s"$SecureExternal://localhost:0, $External://localhost:0") + props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, props.get(SocketServerConfigs.LISTENERS_CONFIG)) props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," + - s"$External:PLAINTEXT, $SecureExternal:SASL_SSL") + s"$External:PLAINTEXT, $SecureExternal:SASL_SSL, CONTROLLER:PLAINTEXT") + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, Internal) - props.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism) props.put(s"${new ListenerName(SecureInternal).configPrefix}${BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG}", kafkaServerSaslMechanisms(SecureInternal).mkString(",")) @@ -103,7 +106,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT } props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path") - servers += TestUtils.createServer(KafkaConfig.fromProps(props)) + servers += createBroker(KafkaConfig.fromProps(props)) } servers.map(_.config).foreach { config => @@ -113,10 +116,20 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT s"Unexpected ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} for broker ${config.brokerId}") } - TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT, - replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs) - - createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD) + val adminClientConfig = new java.util.HashMap[String, Object]() + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + TestUtils.bootstrapServers(servers, new ListenerName(Internal))) + admin = Admin.create(adminClientConfig) + val newTopic = new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, + GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT, 2.toShort) + val newTopicConfigs = new java.util.HashMap[String, String]() + servers.head.groupCoordinator.groupMetadataTopicConfigs.entrySet(). + forEach(e => newTopicConfigs.put(e.getKey.toString, e.getValue.toString)) + newTopic.configs(newTopicConfigs) + admin.createTopics(java.util.Arrays.asList(newTopic)).all().get(5, TimeUnit.MINUTES) + + createScramCredentials(admin, JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD) + TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer) servers.head.config.listeners.foreach { endPoint => val listenerName = endPoint.listenerName @@ -130,7 +143,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT def addProducerConsumer(listenerName: ListenerName, mechanism: String, saslProps: Option[Properties]): Unit = { val topic = s"${listenerName.value}${producers.size}" - TestUtils.createTopic(zkClient, topic, 2, 2, servers) + admin.createTopics(java.util.Arrays.asList(new NewTopic(topic, 2, 2.toShort))).all().get(5, TimeUnit.MINUTES) val clientMetadata = ClientMetadata(listenerName, mechanism, topic) producers(clientMetadata) = TestUtils.createProducer(bootstrapServers, acks = -1, @@ -153,6 +166,8 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT @AfterEach override def tearDown(): Unit = { + Option(admin).foreach(_.close()) + admin = null producers.values.foreach(_.close()) consumers.values.foreach(_.close()) TestUtils.shutdownServers(servers) @@ -165,7 +180,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT * with acks=-1 to ensure that replication is also working. */ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) def testProduceConsume(quorum: String, groupProtocol: String): Unit = { producers.foreach { case (clientMetadata, producer) => val producerRecords = (1 to 10).map(i => new ProducerRecord(clientMetadata.topic, s"key$i".getBytes,