Skip to content

Commit

Permalink
MINOR: convert SaslClientsWithInvalidCredentialsTest + MultipleListen…
Browse files Browse the repository at this point in the history
…ersWithSameSecurityProtocolBaseTest to KRaft (#17803)

Reviewers: Justine Olshan <[email protected]>
  • Loading branch information
cmccabe authored Nov 14, 2024
1 parent 6e9c56a commit e035f70
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.SaslAuthenticationException
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.storage.Formatter
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}

import scala.jdk.javaapi.OptionConverters
import scala.util.Using


class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
Expand All @@ -57,9 +58,11 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {

override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
super.configureSecurityBeforeServersStart(testInfo)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker credentials before starting brokers
createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
}

override def addFormatterSettings(formatter: Formatter): Unit = {
formatter.setScramArguments(
List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava)
}

override def createPrivilegedAdminClient() = {
Expand All @@ -72,7 +75,11 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo)
createTopic(topic, numPartitions, brokerCount)
Using(createPrivilegedAdminClient()) { superuserAdminClient =>
TestUtils.createTopicWithAdmin(
superuserAdminClient, topic, brokers, controllerServers, numPartitions
)
}
}

@AfterEach
Expand All @@ -81,7 +88,7 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
closeSasl()
}

@ParameterizedTest
@ParameterizedTest(name="{displayName}.quorum=kraft.isIdempotenceEnabled={0}")
@ValueSource(booleans = Array(true, false))
def testProducerWithAuthenticationFailure(isIdempotenceEnabled: Boolean): Unit = {
val prop = new Properties()
Expand All @@ -101,8 +108,9 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
verifyWithRetry(sendOneRecord(producer2))
}

@Test
def testTransactionalProducerWithAuthenticationFailure(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testTransactionalProducerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val txProducer = createTransactionalProducer()
verifyAuthenticationException(txProducer.initTransactions())

Expand All @@ -111,23 +119,23 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
verifyConsumerWithAuthenticationFailure(consumer)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
verifyConsumerWithAuthenticationFailure(consumer)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)
val consumer = createConsumer()
Expand All @@ -146,8 +154,9 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count))
}

@Test
def testKafkaAdminClientWithAuthenticationFailure(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testKafkaAdminClientWithAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, OptionConverters.toJava(trustStoreFile), OptionConverters.toJava(clientSaslProperties))
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val adminClient = Admin.create(props)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import kafka.security.JaasTestUtils
import kafka.security.JaasTestUtils.JaasSection
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.server.config.{ReplicationConfigs, ZkConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions.assertEquals
Expand All @@ -57,7 +58,8 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
import MultipleListenersWithSameSecurityProtocolBaseTest._

private val trustStoreFile = TestUtils.tempFile("truststore", ".jks")
private val servers = new ArrayBuffer[KafkaServer]
private val servers = new ArrayBuffer[KafkaBroker]
private var admin: Admin = null
private val producers = mutable.Map[ClientMetadata, KafkaProducer[Array[Byte], Array[Byte]]]()
private val consumers = mutable.Map[ClientMetadata, Consumer[Array[Byte], Array[Byte]]]()

Expand All @@ -78,14 +80,15 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT

(0 until numServers).foreach { brokerId =>

val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile))
val props = TestUtils.createBrokerConfig(brokerId, null, trustStoreFile = Some(trustStoreFile))
// Ensure that we can support multiple listeners per security protocol and multiple security protocols
props.put(SocketServerConfigs.LISTENERS_CONFIG, s"$SecureInternal://localhost:0, $Internal://localhost:0, " +
s"$SecureExternal://localhost:0, $External://localhost:0")
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, props.get(SocketServerConfigs.LISTENERS_CONFIG))
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," +
s"$External:PLAINTEXT, $SecureExternal:SASL_SSL")
s"$External:PLAINTEXT, $SecureExternal:SASL_SSL, CONTROLLER:PLAINTEXT")
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, Internal)
props.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism)
props.put(s"${new ListenerName(SecureInternal).configPrefix}${BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG}",
kafkaServerSaslMechanisms(SecureInternal).mkString(","))
Expand All @@ -103,7 +106,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
}
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")

servers += TestUtils.createServer(KafkaConfig.fromProps(props))
servers += createBroker(KafkaConfig.fromProps(props))
}

servers.map(_.config).foreach { config =>
Expand All @@ -113,10 +116,20 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
s"Unexpected ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} for broker ${config.brokerId}")
}

TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT,
replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs)

createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD)
val adminClientConfig = new java.util.HashMap[String, Object]()
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
TestUtils.bootstrapServers(servers, new ListenerName(Internal)))
admin = Admin.create(adminClientConfig)
val newTopic = new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME,
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT, 2.toShort)
val newTopicConfigs = new java.util.HashMap[String, String]()
servers.head.groupCoordinator.groupMetadataTopicConfigs.entrySet().
forEach(e => newTopicConfigs.put(e.getKey.toString, e.getValue.toString))
newTopic.configs(newTopicConfigs)
admin.createTopics(java.util.Arrays.asList(newTopic)).all().get(5, TimeUnit.MINUTES)

createScramCredentials(admin, JaasTestUtils.KAFKA_SCRAM_USER, JaasTestUtils.KAFKA_SCRAM_PASSWORD)
TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)

servers.head.config.listeners.foreach { endPoint =>
val listenerName = endPoint.listenerName
Expand All @@ -130,7 +143,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
def addProducerConsumer(listenerName: ListenerName, mechanism: String, saslProps: Option[Properties]): Unit = {

val topic = s"${listenerName.value}${producers.size}"
TestUtils.createTopic(zkClient, topic, 2, 2, servers)
admin.createTopics(java.util.Arrays.asList(new NewTopic(topic, 2, 2.toShort))).all().get(5, TimeUnit.MINUTES)
val clientMetadata = ClientMetadata(listenerName, mechanism, topic)

producers(clientMetadata) = TestUtils.createProducer(bootstrapServers, acks = -1,
Expand All @@ -153,6 +166,8 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT

@AfterEach
override def tearDown(): Unit = {
Option(admin).foreach(_.close())
admin = null
producers.values.foreach(_.close())
consumers.values.foreach(_.close())
TestUtils.shutdownServers(servers)
Expand All @@ -165,7 +180,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
* with acks=-1 to ensure that replication is also working.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testProduceConsume(quorum: String, groupProtocol: String): Unit = {
producers.foreach { case (clientMetadata, producer) =>
val producerRecords = (1 to 10).map(i => new ProducerRecord(clientMetadata.topic, s"key$i".getBytes,
Expand Down

0 comments on commit e035f70

Please sign in to comment.