Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16974 #3

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/test/java/kafka/test/annotation/ClusterTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
Expand Down Expand Up @@ -49,6 +50,7 @@
@Target({METHOD})
@Retention(RUNTIME)
@TestTemplate
@Timeout(60)
@Tag("integration")
public @interface ClusterTemplate {
/**
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/kafka/test/annotation/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
Expand All @@ -34,6 +35,7 @@
@Target({METHOD})
@Retention(RUNTIME)
@TestTemplate
@Timeout(60)
@Tag("integration")
public @interface ClusterTest {
Type[] types() default {};
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/kafka/test/annotation/ClusterTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
Expand All @@ -31,6 +32,7 @@
@Target({METHOD})
@Retention(RUNTIME)
@TestTemplate
@Timeout(60)
@Tag("integration")
public @interface ClusterTests {
ClusterTest[] value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package kafka.api
import java.util
import java.util.concurrent._
import java.util.Properties

import com.yammer.metrics.core.Gauge
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils
Expand All @@ -29,9 +28,12 @@ import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrinci
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer._
import org.apache.kafka.common.network.ConnectionMode
import org.apache.kafka.metadata.authorizer.{ClusterMetadataAuthorizer, StandardAuthorizer}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.jdk.CollectionConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -84,6 +86,46 @@ object SslAdminIntegrationTest {
}
}

class TestableStandardAuthorizer extends StandardAuthorizer with ClusterMetadataAuthorizer {

override def createAcls(requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = {
lastUpdateRequestContext = Some(requestContext)
execute[AclCreateResult](aclBindings.size, () => super.createAcls(requestContext, aclBindings))
}

override def deleteAcls(requestContext: AuthorizableRequestContext,
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = {
lastUpdateRequestContext = Some(requestContext)
execute[AclDeleteResult](aclBindingFilters.size, () => super.deleteAcls(requestContext, aclBindingFilters))
}

private def execute[T](batchSize: Int, action: () => util.List[_ <: CompletionStage[T]]): util.List[CompletableFuture[T]] = {
val futures = (0 until batchSize).map(_ => new CompletableFuture[T]).toList
val runnable = new Runnable {
override def run(): Unit = {
semaphore.foreach(_.acquire())
try {
action.apply().asScala.zip(futures).foreach { case (baseFuture, resultFuture) =>
try {
resultFuture.complete(baseFuture.toCompletableFuture.get())
} catch {
case e: Throwable => resultFuture.completeExceptionally(e)
}
}
} finally {
semaphore.foreach(_.release())
}
}
}
executor match {
case Some(executorService) => executorService.submit(runnable)
case None => runnable.run()
}
futures.asJava
}
}

class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
private val Pattern = "O=A (.*?),CN=(.*?)".r

Expand All @@ -110,6 +152,7 @@ object SslAdminIntegrationTest {

class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
override val zkAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
override val kraftAuthorizerClassName: String = classOf[SslAdminIntegrationTest.TestableStandardAuthorizer].getName

this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[SslAdminIntegrationTest.TestPrincipalBuilder].getName)
Expand All @@ -134,13 +177,15 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
super.tearDown()
}

@Test
def testAclUpdatesUsingSynchronousAuthorizer(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclUpdatesUsingSynchronousAuthorizer(quorum: String): Unit = {
verifyAclUpdates()
}

@Test
def testAclUpdatesUsingAsynchronousAuthorizer(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testAclUpdatesUsingAsynchronousAuthorizer(quorum: String): Unit = {
SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor)
verifyAclUpdates()
}
Expand All @@ -149,31 +194,37 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
* Verify that ACL updates using synchronous authorizer are performed synchronously
* on request threads without any performance overhead introduced by a purgatory.
*/
@Test
def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testSynchronousAuthorizerAclUpdatesBlockRequestThreads(quorum: String): Unit = {
val testSemaphore = new Semaphore(0)
SslAdminIntegrationTest.semaphore = Some(testSemaphore)
waitForNoBlockedRequestThreads()

// Queue requests until all threads are blocked. ACL create requests are sent to least loaded
// node, so we may need more than `numRequestThreads` requests to block all threads.
val aclFutures = mutable.Buffer[CreateAclsResult]()
while (blockedRequestThreads.size < numRequestThreads) {
// TODO: fix comment
// createAcl under kraft only handled in controller servers, brokers won't handle it
// so here we only consider number of controller io threads
val numReqThreads = if (isKRaftTest()) controllerServers.head.config.numIoThreads * controllerServers.size else numRequestThreads
while (blockedRequestThreads.size < numReqThreads) {
aclFutures += createAdminClient.createAcls(List(acl2).asJava)
assertTrue(aclFutures.size < numRequestThreads * 10,
s"Request threads not blocked numRequestThreads=$numRequestThreads blocked=$blockedRequestThreads")
// TODO: add comment
assertTrue(aclFutures.size < numReqThreads * 100,
s"Request threads not blocked numRequestThreads=$numReqThreads blocked=$blockedRequestThreads aclFutures=${aclFutures.size}")
}
assertEquals(0, purgatoryMetric("NumDelayedOperations"))
assertEquals(0, purgatoryMetric("PurgatorySize"))

// Verify that operations on other clients are blocked
val describeFuture = createAdminClient.describeCluster().clusterId()
assertFalse(describeFuture.isDone)
val listPartitionReassignmentsFuture = createAdminClient.listPartitionReassignments().reassignments()
assertFalse(listPartitionReassignmentsFuture.isDone)

// Release the semaphore and verify that all requests complete
testSemaphore.release(aclFutures.size)
waitForNoBlockedRequestThreads()
assertNotNull(describeFuture.get(10, TimeUnit.SECONDS))
assertNotNull(listPartitionReassignmentsFuture.get(10, TimeUnit.SECONDS))
// If any of the requests time out since we were blocking the threads earlier, retry the request.
val numTimedOut = aclFutures.count { future =>
try {
Expand All @@ -196,8 +247,9 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
* Verify that ACL updates using an asynchronous authorizer are completed asynchronously
* using a purgatory, enabling other requests to be processed even when ACL updates are blocked.
*/
@Test
def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAsynchronousAuthorizerAclUpdatesDontBlockRequestThreads(quorum: String): Unit = {
SslAdminIntegrationTest.executor = Some(Executors.newSingleThreadExecutor)
val testSemaphore = new Semaphore(0)
SslAdminIntegrationTest.semaphore = Some(testSemaphore)
Expand All @@ -208,7 +260,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
waitForNoBlockedRequestThreads()
assertTrue(aclFutures.forall(future => !future.all.isDone))
// Other requests should succeed even though ACL updates are blocked
assertNotNull(createAdminClient.describeCluster().clusterId().get(10, TimeUnit.SECONDS))
assertNotNull(createAdminClient.listPartitionReassignments().reassignments().get(10, TimeUnit.SECONDS))
TestUtils.waitUntilTrue(() => purgatoryMetric("PurgatorySize") > 0, "PurgatorySize metrics not updated")
TestUtils.waitUntilTrue(() => purgatoryMetric("NumDelayedOperations") > 0, "NumDelayedOperations metrics not updated")

Expand All @@ -224,8 +276,12 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, SslAdminIntegrationTest.clientCn)

def validateRequestContext(context: AuthorizableRequestContext, apiKey: ApiKeys): Unit = {
assertEquals(SecurityProtocol.SSL, context.securityProtocol)
assertEquals("SSL", context.listenerName)
// kraft forward createACL request from broker to controller hence the security protocol is missing
// after forwarding and become PLAINTEXT, assert SSL
if (!isKRaftTest()) {
assertEquals(SecurityProtocol.SSL, context.securityProtocol)
assertEquals("SSL", context.listenerName)
}
assertEquals(clientPrincipal, context.principal)
assertEquals(apiKey.id.toInt, context.requestType)
assertEquals(apiKey.latestVersion.toInt, context.requestVersion)
Expand Down Expand Up @@ -266,7 +322,10 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
requestThreads.filter(_.getState == Thread.State.WAITING).toList
}

private def numRequestThreads = servers.head.config.numIoThreads * servers.size
private def numRequestThreads = {
if (isKRaftTest()) brokers.head.config.numIoThreads * (brokers.size + controllerServers.size)
else servers.head.config.numIoThreads * servers.size
}

private def waitForNoBlockedRequestThreads(): Unit = {
val (blockedThreads, _) = TestUtils.computeUntilTrue(blockedRequestThreads)(_.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertThrows, fail}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

import java.util.Optional
Expand All @@ -41,7 +40,6 @@ import scala.jdk.CollectionConverters._
* failure paths is to use timeouts. See {@link unit.kafka.server.BrokerRegistrationRequestTest} for integration test
* of just the broker registration path.
*/
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class KafkaServerKRaftRegistrationTest {

Expand Down
42 changes: 0 additions & 42 deletions core/src/test/scala/other/kafka/TestTruncate.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.server.common.ProducerIdsBlock
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

import java.util
Expand All @@ -44,7 +43,6 @@ import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
/**
* This test simulates a broker registering with the KRaft quorum under different configurations.
*/
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class BrokerRegistrationRequestTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

import java.lang.{Byte => JByte}
import scala.jdk.CollectionConverters._

@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, Consumer
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

import scala.collection.Map
import scala.jdk.CollectionConverters._

@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

import scala.jdk.CollectionConverters._

@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
Expand Down
Loading
Loading