Skip to content

Commit

Permalink
MINOR: convert GssapiAuthenticationTest to KRaft (#17786)
Browse files Browse the repository at this point in the history
Reviewers: Matthias J. Sax <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
cmccabe authored Nov 14, 2024
1 parent 401b347 commit 6e9c56a
Showing 1 changed file with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import org.apache.kafka.common.security.kerberos.KerberosLogin
import org.apache.kafka.common.utils.{LogContext, MockTime}
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -69,15 +69,15 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
serverConfig.put(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG, failedAuthenticationDelayMs.toString)
super.setUp(testInfo)
serverAddr = new InetSocketAddress("localhost",
servers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)))
brokers.head.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)))

clientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name)
clientConfig.put(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism)
clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism))
clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, "5000")

// create the test topic with all the brokers as replicas
createTopic(topic, 2, brokerCount)
createTopic(topic, 2, brokerCount, listenerName = listenerName, adminClientConfig = adminClientConfig)
}

@AfterEach
Expand All @@ -92,15 +92,16 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* Tests that Kerberos replay error `Request is a replay (34)` is not handled as an authentication exception
* since replay detection used to detect DoS attacks may occasionally reject valid concurrent requests.
*/
@Test
def testRequestIsAReplay(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testRequestIsAReplay(quorum: String): Unit = {
val successfulAuthsPerThread = 10
val futures = (0 until numThreads).map(_ => executor.submit(new Runnable {
override def run(): Unit = verifyRetriableFailuresDuringAuthentication(successfulAuthsPerThread)
}))
futures.foreach(_.get(60, TimeUnit.SECONDS))
assertEquals(0, TestUtils.totalMetricValue(servers.head, "failed-authentication-total"))
val successfulAuths = TestUtils.totalMetricValue(servers.head, "successful-authentication-total")
assertEquals(0, TestUtils.totalMetricValue(brokers.head, "failed-authentication-total"))
val successfulAuths = TestUtils.totalMetricValue(brokers.head, "successful-authentication-total")
assertTrue(successfulAuths > successfulAuthsPerThread * numThreads, "Too few authentications: " + successfulAuths)
}

Expand All @@ -109,8 +110,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* are able to connect after the second re-login. Verifies that logout is performed only once
* since duplicate logouts without successful login results in NPE from Java 9 onwards.
*/
@Test
def testLoginFailure(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testLoginFailure(quorum: String): Unit = {
val selector = createSelectorWithRelogin()
try {
val login = TestableKerberosLogin.instance
Expand All @@ -132,8 +134,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* is performed when credentials are unavailable between logout and login, we handle it as a
* transient error and not an authentication failure so that clients may retry.
*/
@Test
def testReLogin(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testReLogin(quorum: String): Unit = {
val selector = createSelectorWithRelogin()
try {
val login = TestableKerberosLogin.instance
Expand Down Expand Up @@ -163,8 +166,9 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
* as a fatal authentication failure.
*/
@Test
def testServerNotFoundInKerberosDatabase(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testServerNotFoundInKerberosDatabase(quorum: String): Unit = {
val jaasConfig = clientConfig.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
val invalidServiceConfig = jaasConfig.replace("serviceName=\"kafka\"", "serviceName=\"invalid-service\"")
clientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, invalidServiceConfig)
Expand Down

0 comments on commit 6e9c56a

Please sign in to comment.