diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8d5152a0d21c2..5fbf62ea4db2f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3833,6 +3833,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest] + val includeAuthorizedOperations = consumerGroupDescribeRequest.data.includeAuthorizedOperations if (!isConsumerGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the @@ -3861,6 +3862,17 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception)) } else { + if (includeAuthorizedOperations) { + results.forEach { groupResult => + if (groupResult.errorCode == Errors.NONE.code) { + groupResult.setAuthorizedOperations(authHelper.authorizedOperations( + request, + new Resource(ResourceType.GROUP, groupResult.groupId) + )) + } + } + } + if (response.groups.isEmpty) { // If the response is empty, we can directly reuse the results. response.setGroups(results) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala index b1f8b8405e743..0e745f33d5bb3 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala @@ -16,9 +16,8 @@ */ package kafka.server -import kafka.server.GroupCoordinatorBaseRequestTest import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation._ import kafka.test.junit.ClusterTestExtensions import kafka.utils.TestUtils import org.apache.kafka.common.ConsumerGroupState @@ -26,11 +25,15 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assign import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse} +import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{Tag, Timeout} +import java.lang.{Byte => JByte} import scala.jdk.CollectionConverters._ @Timeout(120) @@ -116,6 +119,9 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC val timeoutMs = 5 * 60 * 1000 val clientId = "client-id" val clientHost = "/127.0.0.1" + val authorizedOperationsInt = Utils.to32BitField( + AclEntry.supportedOperations(ResourceType.GROUP).asScala + .map(_.code.asInstanceOf[JByte]).asJava) // Add first group with one member. var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null @@ -162,6 +168,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC .setGroupEpoch(1) .setAssignmentEpoch(1) .setAssignorName("uniform") + .setAuthorizedOperations(authorizedOperationsInt) .setMembers(List( new ConsumerGroupDescribeResponseData.Member() .setMemberId(grp1Member1Response.memberId) @@ -177,6 +184,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC .setGroupEpoch(grp2Member2Response.memberEpoch) .setAssignmentEpoch(grp2Member2Response.memberEpoch) .setAssignorName("range") + .setAuthorizedOperations(authorizedOperationsInt) .setMembers(List( new ConsumerGroupDescribeResponseData.Member() .setMemberId(grp2Member2Response.memberId) @@ -219,7 +227,8 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC val actual = consumerGroupDescribe( groupIds = List("grp-1", "grp-2"), - version = version.toShort + includeAuthorizedOperations = true, + version = version.toShort, ) assertEquals(expected, actual) diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 847bdf3225f54..9fad21476e73c 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -421,10 +421,13 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { protected def consumerGroupDescribe( groupIds: List[String], + includeAuthorizedOperations: Boolean, version: Short = ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled) ): List[ConsumerGroupDescribeResponseData.DescribedGroup] = { val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder( - new ConsumerGroupDescribeRequestData().setGroupIds(groupIds.asJava) + new ConsumerGroupDescribeRequestData() + .setGroupIds(groupIds.asJava) + .setIncludeAuthorizedOperations(includeAuthorizedOperations) ).build(version) val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 16b12e3fc5ff2..208f9d59e1b6f 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -76,6 +76,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils} import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.raft.QuorumConfig +import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} @@ -92,6 +93,7 @@ import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} +import java.lang.{Byte => JByte} import java.net.InetAddress import java.nio.charset.StandardCharsets import java.time.Duration @@ -7113,8 +7115,9 @@ class KafkaApisTest extends Logging { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } - @Test - def testConsumerGroupDescribe(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { metadataCache = mock(classOf[KRaftMetadataCache]) when(metadataCache.features()).thenReturn { new FinalizedFeatures( @@ -7127,6 +7130,7 @@ class KafkaApisTest extends Logging { val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + .setIncludeAuthorizedOperations(includeAuthorizedOperations) consumerGroupDescribeRequestData.groupIds.addAll(groupIds) val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) @@ -7141,15 +7145,27 @@ class KafkaApisTest extends Logging { ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) - val describedGroups = List( + future.complete(List( new DescribedGroup().setGroupId(groupIds.get(0)), new DescribedGroup().setGroupId(groupIds.get(1)), new DescribedGroup().setGroupId(groupIds.get(2)) - ).asJava + ).asJava) - future.complete(describedGroups) + var authorizedOperationsInt = Int.MinValue; + if (includeAuthorizedOperations) { + authorizedOperationsInt = Utils.to32BitField( + AclEntry.supportedOperations(ResourceType.GROUP).asScala + .map(_.code.asInstanceOf[JByte]).asJava) + } + + // Can't reuse the above list here because we would not test the implementation in KafkaApis then + val describedGroups = List( + new DescribedGroup().setGroupId(groupIds.get(0)), + new DescribedGroup().setGroupId(groupIds.get(1)), + new DescribedGroup().setGroupId(groupIds.get(2)) + ).map(group => group.setAuthorizedOperations(authorizedOperationsInt)) val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() - .setGroups(describedGroups) + .setGroups(describedGroups.asJava) val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)