Skip to content

Commit

Permalink
KAFKA-17036 KIP-919 supports for createAcls, deleteAcls, describeAcls (
Browse files Browse the repository at this point in the history
…#16493)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
FrankYang0529 authored Sep 13, 2024
1 parent d393636 commit 4692aeb
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2573,7 +2573,7 @@ public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAc
final long now = time.milliseconds();
final KafkaFutureImpl<Collection<AclBinding>> future = new KafkaFutureImpl<>();
runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
new LeastLoadedBrokerOrActiveKController()) {

@Override
DescribeAclsRequest.Builder createRequest(int timeoutMs) {
Expand Down Expand Up @@ -2620,7 +2620,7 @@ public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOption
}
final CreateAclsRequestData data = new CreateAclsRequestData().setCreations(aclCreations);
runnable.call(new Call("createAcls", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
new LeastLoadedBrokerOrActiveKController()) {

@Override
CreateAclsRequest.Builder createRequest(int timeoutMs) {
Expand Down Expand Up @@ -2672,7 +2672,7 @@ public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteA
}
final DeleteAclsRequestData data = new DeleteAclsRequestData().setFilters(deleteAclsFilters);
runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
new LeastLoadedBrokerOrActiveKController()) {

@Override
DeleteAclsRequest.Builder createRequest(int timeoutMs) {
Expand Down
33 changes: 24 additions & 9 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object AclCommand extends Logging {

private val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " +
"through the authorizer is deprecated and will be removed in a future release. Please use " +
"--bootstrap-server instead to set ACLs through the admin client."
"--bootstrap-server or --bootstrap-controller instead to set ACLs through the admin client."
private val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)

private val Newline = scala.util.Properties.lineSeparator
Expand All @@ -57,7 +57,7 @@ object AclCommand extends Logging {
opts.checkArgs()

val aclCommandService = {
if (opts.options.has(opts.bootstrapServerOpt)) {
if (opts.options.has(opts.bootstrapServerOpt) || opts.options.has(opts.bootstrapControllerOpt)) {
new AdminClientService(opts)
} else {
val authorizerClassName = if (opts.options.has(opts.authorizerOpt))
Expand Down Expand Up @@ -97,7 +97,12 @@ object AclCommand extends Logging {
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))

if (opts.options.has(opts.bootstrapServerOpt)) {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
} else {
props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt))
}
val adminClient = Admin.create(props)

try {
Expand Down Expand Up @@ -493,6 +498,12 @@ object AclCommand extends Logging {
.describedAs("server to connect to")
.ofType(classOf[String])

val bootstrapControllerOpt: OptionSpec[String] = parser.accepts("bootstrap-controller", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
" This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
.withRequiredArg
.describedAs("controller to connect to")
.ofType(classOf[String])

val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc)
.withOptionalArg()
.describedAs("command-config")
Expand Down Expand Up @@ -626,18 +637,22 @@ object AclCommand extends Logging {
options = parser.parse(args: _*)

def checkArgs(): Unit = {
if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --authorizer must be specified")
if (options.has(bootstrapServerOpt) && options.has(bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --bootstrap-controller must be specified")

val hasServerOrController = options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt)
if (hasServerOrController && options.has(authorizerOpt))
CommandLineUtils.printUsageAndExit(parser, "The --authorizer option can only be used without --bootstrap-server or --bootstrap-controller")

if (!options.has(bootstrapServerOpt)) {
if (!hasServerOrController) {
CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
System.err.println(AclCommand.AuthorizerDeprecationMessage)
}

if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
CommandLineUtils.printUsageAndExit(parser, "The --command-config option can only be used with --bootstrap-server option")
if (options.has(commandConfigOpt) && (!hasServerOrController))
CommandLineUtils.printUsageAndExit(parser, "The --command-config option can only be used with --bootstrap-server or --bootstrap-controller option")

if (options.has(authorizerPropertiesOpt) && options.has(bootstrapServerOpt))
if (options.has(authorizerPropertiesOpt) && hasServerOrController)
CommandLineUtils.printUsageAndExit(parser, "The --authorizer-properties option can only be used with --authorizer option")

val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
Expand Down
Loading

0 comments on commit 4692aeb

Please sign in to comment.