diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index d7d525e443171..eca6a8b45c9ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2245,7 +2245,7 @@ void handleResponse(AbstractResponse abstractResponse) { continue; } - TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes, options.includeAuthorizedOperations()); if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { // Add the partitions for the cursor topic of the previous batch. @@ -2408,14 +2408,16 @@ void handleFailure(Throwable throwable) { private TopicDescription getTopicDescriptionFromDescribeTopicsResponseTopic( DescribeTopicPartitionsResponseTopic topic, - Map nodes + Map nodes, + boolean includeAuthorizedOperations ) { List partitionInfos = topic.partitions(); List partitions = new ArrayList<>(partitionInfos.size()); for (DescribeTopicPartitionsResponsePartition partitionInfo : partitionInfos) { partitions.add(DescribeTopicPartitionsResponse.partitionToTopicPartitionInfo(partitionInfo, nodes)); } - return new TopicDescription(topic.name(), topic.isInternal(), partitions, validAclOperations(topic.topicAuthorizedOperations()), topic.topicId()); + Set authorisedOperations = includeAuthorizedOperations ? validAclOperations(topic.topicAuthorizedOperations()) : null; + return new TopicDescription(topic.name(), topic.isInternal(), partitions, authorisedOperations, topic.topicId()); } private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String topicName, Uuid topicId, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index f7b27dcb868ad..31c2cc7fcf45e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1474,6 +1474,7 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() { assertEquals(0, topicDescription.partitions().get(0).partition()); assertEquals(1, topicDescription.partitions().get(1).partition()); topicDescription = topicDescriptions.get(topicName1); + assertNull(topicDescription.authorizedOperations()); assertEquals(1, topicDescription.partitions().size()); } catch (Exception e) { fail("describe using DescribeTopics API should not fail", e); @@ -1481,6 +1482,77 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() { } } + @Test + public void testDescribeTopicPartitionsApiWithAuthorizedOps() throws ExecutionException, InterruptedException { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String topicName0 = "test-0"; + Uuid topicId = Uuid.randomUuid(); + + int authorisedOperations = Utils.to32BitField(Utils.mkSet(AclOperation.DESCRIBE.code(), AclOperation.ALTER.code())); + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + authorisedOperations) + ); + + DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData(); + responseData.topics().add(new DescribeTopicPartitionsResponseTopic() + .setErrorCode((short) 0) + .setTopicId(topicId) + .setName(topicName0) + .setIsInternal(false) + .setTopicAuthorizedOperations(authorisedOperations)); + env.kafkaClient().prepareResponse(new DescribeTopicPartitionsResponse(responseData)); + + DescribeTopicsResult result = env.adminClient().describeTopics( + singletonList(topicName0), new DescribeTopicsOptions().includeAuthorizedOperations(true) + ); + + Map topicDescriptions = result.allTopicNames().get(); + TopicDescription topicDescription = topicDescriptions.get(topicName0); + assertEquals(new HashSet<>(asList(AclOperation.DESCRIBE, AclOperation.ALTER)), + topicDescription.authorizedOperations()); + } + } + + @Test + public void testDescribeTopicPartitionsApiWithoutAuthorizedOps() throws ExecutionException, InterruptedException { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String topicName0 = "test-0"; + Uuid topicId = Uuid.randomUuid(); + + int authorisedOperations = Utils.to32BitField(Utils.mkSet(AclOperation.DESCRIBE.code(), AclOperation.ALTER.code())); + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + authorisedOperations) + ); + + DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData(); + responseData.topics().add(new DescribeTopicPartitionsResponseTopic() + .setErrorCode((short) 0) + .setTopicId(topicId) + .setName(topicName0) + .setIsInternal(false) + .setTopicAuthorizedOperations(authorisedOperations)); + env.kafkaClient().prepareResponse(new DescribeTopicPartitionsResponse(responseData)); + + DescribeTopicsResult result = env.adminClient().describeTopics( + singletonList(topicName0), new DescribeTopicsOptions().includeAuthorizedOperations(false) + ); + + Map topicDescriptions = result.allTopicNames().get(); + TopicDescription topicDescription = topicDescriptions.get(topicName0); + assertNull(topicDescription.authorizedOperations()); + } + } + @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"}) @Test public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() { @@ -1554,6 +1626,7 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() { assertEquals(2, topicDescription.partitions().size()); topicDescription = topicDescriptions.get(topicName2); assertEquals(2, topicDescription.partitions().size()); + assertNull(topicDescription.authorizedOperations()); } catch (Exception e) { fail("describe using DescribeTopics API should not fail", e); }