Skip to content

Commit

Permalink
KAFKA-9661: Propagate includeSynonyms option to AdminClient in Config…
Browse files Browse the repository at this point in the history
…Command (#8229)
  • Loading branch information
mumrah authored Mar 5, 2020
1 parent 41f9d7a commit bd53249
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, ListTopicsOptions, Config => JConfig}
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, Config => JConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
Expand Down Expand Up @@ -367,7 +367,7 @@ object ConfigCommand extends Config {
println(s"Completed updating default config for $entityType in the cluster.")
}

private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
private[admin] def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityType = opts.entityTypes.head
val entityName = opts.entityNames.headOption
val describeAll = opts.options.has(opts.allOpt)
Expand Down Expand Up @@ -426,7 +426,9 @@ object ConfigCommand extends Config {
dynamicConfigSource

val configResource = new ConfigResource(configResourceType, entityName)
val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS)
val describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms)
val configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
.all.get(30, TimeUnit.SECONDS)
configs.get(configResource).entries.asScala
.filter(entry => configSourceFilter match {
case Some(configSource) => entry.source == configSource
Expand Down
34 changes: 32 additions & 2 deletions core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
val node = new Node(1, "localhost", 9092)
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
assertFalse("Config synonyms requested unnecessarily", options.includeSynonyms())
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(resource.`type`, ConfigResource.Type.TOPIC)
Expand Down Expand Up @@ -390,6 +391,34 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
EasyMock.reset(alterResult, describeResult)
}

@Test
def shouldDescribeConfigSynonyms(): Unit = {
val resourceName = "my-topic"
val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
"--entity-name", resourceName,
"--entity-type", "topics",
"--describe",
"--all"))

val resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
future.complete(util.Collections.singletonMap(resource, new Config(util.Collections.emptyList[ConfigEntry])))
val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
EasyMock.expect(describeResult.all()).andReturn(future).once()

val node = new Node(1, "localhost", 9092)
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
assertTrue("Synonyms not requested", options.includeSynonyms())
assertEquals(Set(resource), resources.asScala.toSet)
describeResult
}
}
EasyMock.replay(describeResult)
ConfigCommand.describeConfig(mockAdminClient, describeOpts)
EasyMock.reset(describeResult)
}

@Test
def shouldAddBrokerQuotaConfig(): Unit = {
val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
Expand Down Expand Up @@ -539,6 +568,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {

val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
assertFalse("Config synonyms requested unnecessarily", options.includeSynonyms())
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(ConfigResource.Type.BROKER, resource.`type`)
Expand Down Expand Up @@ -585,7 +615,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
EasyMock.expect(alterResult.all()).andReturn(alterFuture)

val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult = {
override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`)
Expand Down Expand Up @@ -1098,7 +1128,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}

class DummyAdminClient(node: Node) extends MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult =
override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult =
EasyMock.createNiceMock(classOf[DescribeConfigsResult])
override def incrementalAlterConfigs(configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]],
options: AlterConfigsOptions): AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
Expand Down

0 comments on commit bd53249

Please sign in to comment.