From e9e39ed95cbad83b81ebbd2e1712d6497b8e0aa4 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 5 Mar 2020 12:35:24 +0000 Subject: [PATCH] KAFKA-9661: Propagate includeSynonyms option to AdminClient in ConfigCommand --- .../scala/kafka/admin/ConfigCommand.scala | 8 +++-- .../unit/kafka/admin/ConfigCommandTest.scala | 34 +++++++++++++++++-- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index d8fc59d800e9d..562a91a45a46d 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, ListTopicsOptions, Config => JConfig} +import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, Config => JConfig} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.errors.InvalidConfigurationException @@ -367,7 +367,7 @@ object ConfigCommand extends Config { println(s"Completed updating default config for $entityType in the cluster.") } - private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { + private[admin] def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { val entityType = opts.entityTypes.head val entityName = opts.entityNames.headOption val describeAll = opts.options.has(opts.allOpt) @@ -426,7 +426,9 @@ object ConfigCommand extends Config { dynamicConfigSource val configResource = new ConfigResource(configResourceType, entityName) - val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS) + val describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms) + val configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions) + .all.get(30, TimeUnit.SECONDS) configs.get(configResource).entries.asScala .filter(entry => configSourceFilter match { case Some(configSource) => entry.source == configSource diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index a1f5f3968e683..e938a6d859cb9 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -359,6 +359,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { val node = new Node(1, "localhost", 9092) val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { + assertFalse("Config synonyms requested unnecessarily", options.includeSynonyms()) assertEquals(1, resources.size) val resource = resources.iterator.next assertEquals(resource.`type`, ConfigResource.Type.TOPIC) @@ -390,6 +391,34 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.reset(alterResult, describeResult) } + @Test + def shouldDescribeConfigSynonyms(): Unit = { + val resourceName = "my-topic" + val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--entity-name", resourceName, + "--entity-type", "topics", + "--describe", + "--all")) + + val resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName) + val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] + future.complete(util.Collections.singletonMap(resource, new Config(util.Collections.emptyList[ConfigEntry]))) + val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult]) + EasyMock.expect(describeResult.all()).andReturn(future).once() + + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { + assertTrue("Synonyms not requested", options.includeSynonyms()) + assertEquals(Set(resource), resources.asScala.toSet) + describeResult + } + } + EasyMock.replay(describeResult) + ConfigCommand.describeConfig(mockAdminClient, describeOpts) + EasyMock.reset(describeResult) + } + @Test def shouldAddBrokerQuotaConfig(): Unit = { val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, @@ -539,6 +568,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { + assertFalse("Config synonyms requested unnecessarily", options.includeSynonyms()) assertEquals(1, resources.size) val resource = resources.iterator.next assertEquals(ConfigResource.Type.BROKER, resource.`type`) @@ -585,7 +615,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.expect(alterResult.all()).andReturn(alterFuture) val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { - override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult = { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { assertEquals(1, resources.size) val resource = resources.iterator.next assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`) @@ -1098,7 +1128,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } class DummyAdminClient(node: Node) extends MockAdminClient(util.Collections.singletonList(node), node) { - override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult = + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult]) override def incrementalAlterConfigs(configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]], options: AlterConfigsOptions): AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])