Skip to content

Commit

Permalink
KAFKA-18399 Remove ZooKeeper from KafkaApis (9/N): ALTER_CLIENT_QUOTA…
Browse files Browse the repository at this point in the history
…S and ALLOCATE_PRODUCER_IDS (#18465)

Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
mingdaoy authored Jan 14, 2025
1 parent 730272e commit 9f95597
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 99 deletions.
48 changes: 0 additions & 48 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.UNREGISTER_BROKER => forwardToController(request)
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.DESCRIBE_QUORUM => forwardToController(request)
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
Expand Down Expand Up @@ -2507,37 +2506,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]

if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
alterClientQuotasRequest.validateOnly)

val entriesData = result.iterator.map { case (quotaEntity, apiError) =>
val entityData = quotaEntity.entries.asScala.iterator.map { case (key, value) =>
new AlterClientQuotasResponseData.EntityData()
.setEntityType(key)
.setEntityName(value)
}.toBuffer

new AlterClientQuotasResponseData.EntryData()
.setErrorCode(apiError.error.code)
.setErrorMessage(apiError.message)
.setEntity(entityData.asJava)
}.toBuffer

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterClientQuotasResponse(new AlterClientQuotasResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setEntries(entriesData.asJava)))
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
}
}

def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]

Expand Down Expand Up @@ -2695,22 +2663,6 @@ class KafkaApis(val requestChannel: RequestChannel,
new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
}

def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)

val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest]

if (!zkSupport.controller.isActive)
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception))
else
zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs)))
)
}

private def groupVersion(): GroupVersion = {
GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort))
}
Expand Down
52 changes: 1 addition & 51 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
Expand All @@ -61,7 +61,6 @@ import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
Expand Down Expand Up @@ -730,48 +729,6 @@ class KafkaApisTest extends Logging {
assertEquals(expectedResults, responseMap)
}

@Test
def testAlterClientQuotasWithAuthorizer(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])

authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.DENIED)

val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))

val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0)

val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
.build(requestHeader.apiVersion)
val request = buildRequest(alterClientQuotasRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))

when(controller.isActive).thenReturn(true)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
anyLong)).thenReturn(0)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleAlterClientQuotasRequest(request)

val capturedResponse = verifyNoThrottling[AlterClientQuotasResponse](request)
verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED))

verify(authorizer).authorize(any(), any())
verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong)
}

private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse,
expected: Map[ClientQuotaEntity, Errors]): Unit = {
val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
response.complete(futures.asJava)
futures.foreach {
case (entity, future) =>
future.whenComplete((_, thrown) =>
assertEquals(thrown, expected(entity).exception())
).isDone
}
}

@ParameterizedTest
@CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
def testKRaftControllerThrottleTimeEnforced(
Expand Down Expand Up @@ -10027,13 +9984,6 @@ class KafkaApisTest extends Logging {
setResourceType(BROKER_LOGGER.id()))),
response.data())
}

@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
}

@Test
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
Expand Down

0 comments on commit 9f95597

Please sign in to comment.