Skip to content

Commit

Permalink
KAFKA-14509; [4/4] Handle includeAuthorizedOperations in ConsumerGrou…
Browse files Browse the repository at this point in the history
…pDescribe API (#16158)

This patch implements the handling of `includeAuthorizedOperations` flag in the ConsumerGroupDescribe API.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
riedelmax authored Jun 10, 2024
1 parent 2533a07 commit 40de07d
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
12 changes: 12 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3833,6 +3833,7 @@ class KafkaApis(val requestChannel: RequestChannel,

def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest]
val includeAuthorizedOperations = consumerGroupDescribeRequest.data.includeAuthorizedOperations

if (!isConsumerGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
Expand Down Expand Up @@ -3861,6 +3862,17 @@ class KafkaApis(val requestChannel: RequestChannel,
if (exception != null) {
requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception))
} else {
if (includeAuthorizedOperations) {
results.forEach { groupResult =>
if (groupResult.errorCode == Errors.NONE.code) {
groupResult.setAuthorizedOperations(authHelper.authorizedOperations(
request,
new Resource(ResourceType.GROUP, groupResult.groupId)
))
}
}
}

if (response.groups.isEmpty) {
// If the response is empty, we can directly reuse the results.
response.setGroups(results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,24 @@
*/
package kafka.server

import kafka.server.GroupCoordinatorBaseRequestTest
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation._
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.ConsumerGroupState
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, DescribedGroup, TopicPartitions}
import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Tag, Timeout}

import java.lang.{Byte => JByte}
import scala.jdk.CollectionConverters._

@Timeout(120)
Expand Down Expand Up @@ -116,6 +119,9 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)

// Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
Expand Down Expand Up @@ -162,6 +168,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
Expand All @@ -177,6 +184,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
.setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId)
Expand Down Expand Up @@ -219,7 +227,8 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC

val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"),
version = version.toShort
includeAuthorizedOperations = true,
version = version.toShort,
)

assertEquals(expected, actual)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,13 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {

protected def consumerGroupDescribe(
groupIds: List[String],
includeAuthorizedOperations: Boolean,
version: Short = ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
): List[ConsumerGroupDescribeResponseData.DescribedGroup] = {
val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder(
new ConsumerGroupDescribeRequestData().setGroupIds(groupIds.asJava)
new ConsumerGroupDescribeRequestData()
.setGroupIds(groupIds.asJava)
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
).build(version)

val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest)
Expand Down
28 changes: 22 additions & 6 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
Expand All @@ -92,6 +93,7 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}

import java.lang.{Byte => JByte}
import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.time.Duration
Expand Down Expand Up @@ -7113,8 +7115,9 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}

@Test
def testConsumerGroupDescribe(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
Expand All @@ -7127,6 +7130,7 @@ class KafkaApisTest extends Logging {

val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build())

Expand All @@ -7141,15 +7145,27 @@ class KafkaApisTest extends Logging {
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)

val describedGroups = List(
future.complete(List(
new DescribedGroup().setGroupId(groupIds.get(0)),
new DescribedGroup().setGroupId(groupIds.get(1)),
new DescribedGroup().setGroupId(groupIds.get(2))
).asJava
).asJava)

future.complete(describedGroups)
var authorizedOperationsInt = Int.MinValue;
if (includeAuthorizedOperations) {
authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
}

// Can't reuse the above list here because we would not test the implementation in KafkaApis then
val describedGroups = List(
new DescribedGroup().setGroupId(groupIds.get(0)),
new DescribedGroup().setGroupId(groupIds.get(1)),
new DescribedGroup().setGroupId(groupIds.get(2))
).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData()
.setGroups(describedGroups)
.setGroups(describedGroups.asJava)

val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)

Expand Down

0 comments on commit 40de07d

Please sign in to comment.