Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
brandboat committed Sep 22, 2024
1 parent bf450eb commit 4ed27ad
Showing 1 changed file with 79 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package kafka.api
import java.util
import java.util.concurrent._
import java.util.Properties

import com.yammer.metrics.core.Gauge
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils
Expand All @@ -29,9 +28,12 @@ import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrinci
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer._
import org.apache.kafka.common.network.ConnectionMode
import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer, StandardAuthorizer}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.jdk.CollectionConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -84,6 +86,46 @@ object SslAdminIntegrationTest {
}
}

class TestableStandardAuthorizer extends StandardAuthorizer with ClusterMetadataAuthorizer {

override def createAcls(requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = {
lastUpdateRequestContext = Some(requestContext)
execute[AclCreateResult](aclBindings.size, () => super.createAcls(requestContext, aclBindings))
}

override def deleteAcls(requestContext: AuthorizableRequestContext,
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = {
lastUpdateRequestContext = Some(requestContext)
execute[AclDeleteResult](aclBindingFilters.size, () => super.deleteAcls(requestContext, aclBindingFilters))
}

private def execute[T](batchSize: Int, action: () => util.List[_ <: CompletionStage[T]]): util.List[CompletableFuture[T]] = {
val futures = (0 until batchSize).map(_ => new CompletableFuture[T]).toList
val runnable = new Runnable {
override def run(): Unit = {
semaphore.foreach(_.acquire())
try {
action.apply().asScala.zip(futures).foreach { case (baseFuture, resultFuture) =>
try {
resultFuture.complete(baseFuture.toCompletableFuture.get())
} catch {
case e: Throwable => resultFuture.completeExceptionally(e)
}
}
} finally {
semaphore.foreach(_.release())
}
}
}
executor match {
case Some(executorService) => executorService.submit(runnable)
case None => runnable.run()
}
futures.asJava
}
}

class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
private val Pattern = "O=A (.*?),CN=(.*?)".r

Expand All @@ -110,6 +152,7 @@ object SslAdminIntegrationTest {

class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
override val zkAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
override val kraftAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableStandardAuthorizer].getName

this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName)
Expand All @@ -134,13 +177,15 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
super.tearDown()
}

@Test
def testAclUpdatesUsingSynchronousAuthorizer(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = {
verifyAclUpdates()
}

@Test
def testAclUpdatesUsingAsynchronousAuthorizer(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclUpdatesUsingAsynchronousAuthorizer(quorum: String): Unit = {
SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor)
verifyAclUpdates()
}
Expand All @@ -149,31 +194,37 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
* Verify that ACL updates using synchronous authorizer are performed synchronously
* on request threads without any performance overhead introduced by a purgatory.
*/
@Test
def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(quorum: String): Unit = {
val testSemaphore = new Semaphore(0)
SslAdminIntegrationTest.semaphore = Some(testSemaphore)
waitForNoBlockedRequestThreads()

// Queue requests until all threads are blocked. ACL create requests are sent to least loaded
// node, so we may need more than `numRequestThreads` requests to block all threads.
val aclFutures = mutable.Buffer[CreateAclsResult]()
while (blockedRequestThreads.size < numRequestThreads) {
// TODO: fix comment
// createAcl under kraft only handled in controller servers, brokers won't handle it
// so here we only consider number of controller io threads
val numReqThreads = if (isKRaftTest()) controllerServers.head.config.numIoThreads * controllerServers.size else numRequestThreads
while (blockedRequestThreads.size < numReqThreads) {
aclFutures += createAdminClient.createAcls(List(acl2).asJava)
assertTrue(aclFutures.size < numRequestThreads * 10,
s"Request threads not blocked numRequestThreads=$numRequestThreads blocked=$blockedRequestThreads")
// TODO: add comment
assertTrue(aclFutures.size < numReqThreads * 100,
s"Request threads not blocked numRequestThreads=$numReqThreads blocked=$blockedRequestThreads aclFutures=${aclFutures.size}")
}
assertEquals(0, purgatoryMetric("NumDelayedOperations"))
assertEquals(0, purgatoryMetric("PurgatorySize"))

// Verify that operations on other clients are blocked
val describeFuture = createAdminClient.describeCluster().clusterId()
assertFalse(describeFuture.isDone)
val listPartitionReassignmentsFuture = createAdminClient.listPartitionReassignments().reassignments()
assertFalse(listPartitionReassignmentsFuture.isDone)

// Release the semaphore and verify that all requests complete
testSemaphore.release(aclFutures.size)
waitForNoBlockedRequestThreads()
assertNotNull(describeFuture.get(10, TimeUnit.SECONDS))
assertNotNull(listPartitionReassignmentsFuture.get(10, TimeUnit.SECONDS))
// If any of the requests time out since we were blocking the threads earlier, retry the request.
val numTimedOut = aclFutures.count { future =>
try {
Expand All @@ -196,8 +247,9 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
* Verify that ACL updates using an asynchronous authorizer are completed asynchronously
* using a purgatory, enabling other requests to be processed even when ACL updates are blocked.
*/
@Test
def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(quorum: String): Unit = {
SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor)
val testSemaphore = new Semaphore(0)
SslAdminIntegrationTest.semaphore = Some(testSemaphore)
Expand All @@ -208,7 +260,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
waitForNoBlockedRequestThreads()
assertTrue(aclFutures.forall(future => !future.all.isDone))
// Other requests should succeed even though ACL updates are blocked
assertNotNull(createAdminClient.describeCluster().clusterId().get(10, TimeUnit.SECONDS))
assertNotNull(createAdminClient.listPartitionReassignments().reassignments().get(10, TimeUnit.SECONDS))
TestUtils.waitUntilTrue(() => purgatoryMetric("PurgatorySize") > 0, "PurgatorySize metrics not updated")
TestUtils.waitUntilTrue(() => purgatoryMetric("NumDelayedOperations") > 0, "NumDelayedOperations metrics not updated")

Expand All @@ -224,8 +276,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.clientCn)

def validateRequestContext(context: AuthorizableRequestContext, apiKey: ApiKeys): Unit = {
assertEquals(SecurityProtocol.SSL, context.securityProtocol)
assertEquals("SSL", context.listenerName)
// kraft forward createACL request from broker to controller hence the security protocol is missing
// after forwarding and become PLAINTEXT, assert SSL
if (!isKRaftTest()) {
assertEquals(SecurityProtocol.SSL, context.securityProtocol)
assertEquals("SSL", context.listenerName)
}
assertEquals(clientPrincipal, context.principal)
assertEquals(apiKey.id.toInt, context.requestType)
assertEquals(apiKey.latestVersion.toInt, context.requestVersion)
Expand Down Expand Up @@ -266,7 +322,10 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
requestThreads.filter(_.getState == Thread.State.WAITING).toList
}

private def numRequestThreads = servers.head.config.numIoThreads * servers.size
private def numRequestThreads = {
if (isKRaftTest()) brokers.head.config.numIoThreads * (brokers.size + controllerServers.size)
else servers.head.config.numIoThreads * servers.size
}

private def waitForNoBlockedRequestThreads(): Unit = {
val (blockedThreads, _) = TestUtils.computeUntilTrue(blockedRequestThreads)(_.isEmpty)
Expand Down

0 comments on commit 4ed27ad

Please sign in to comment.