Skip to content

Commit

Permalink
KAFKA-17385 Remove authorizer, authorizer-properties, zk-tls-config-f…
Browse files Browse the repository at this point in the history
…ile in AclCommand (apache#17224)

Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
FrankYang0529 authored Oct 8, 2024
1 parent 2805b42 commit 8db86c6
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 259 deletions.
196 changes: 5 additions & 191 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@ package kafka.admin
import java.util.Properties
import joptsimple._
import joptsimple.util.EnumConverter
import kafka.security.authorizer.AclAuthorizer
import kafka.utils._
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Exit, Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.security.authorizer.{AclEntry, AuthorizerUtils}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}

import scala.jdk.CollectionConverters._
Expand All @@ -41,9 +37,6 @@ import scala.io.StdIn

object AclCommand extends Logging {

private val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " +
"through the authorizer is deprecated and will be removed in a future release. Please use " +
"--bootstrap-server or --bootstrap-controller instead to set ACLs through the admin client."
private val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)

private val Newline = scala.util.Properties.lineSeparator
Expand All @@ -56,18 +49,7 @@ object AclCommand extends Logging {

opts.checkArgs()

val aclCommandService = {
if (opts.options.has(opts.bootstrapServerOpt) || opts.options.has(opts.bootstrapControllerOpt)) {
new AdminClientService(opts)
} else {
val authorizerClassName = if (opts.options.has(opts.authorizerOpt))
opts.options.valueOf(opts.authorizerOpt)
else
classOf[AclAuthorizer].getName

new AuthorizerService(authorizerClassName, opts)
}
}
val aclCommandService = new AdminClientService(opts)

try {
if (opts.options.has(opts.addOpt))
Expand All @@ -84,13 +66,7 @@ object AclCommand extends Logging {
}
}

sealed trait AclCommandService {
def addAcls(): Unit
def removeAcls(): Unit
def listAcls(): Unit
}

private class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {
private class AdminClientService(val opts: AclCommandOptions) extends Logging {

private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = {
val props = if (opts.options.has(opts.commandConfigOpt))
Expand Down Expand Up @@ -194,131 +170,6 @@ object AclCommand extends Logging {
}
}

class AuthorizerService(val authorizerClassName: String, val opts: AclCommandOptions) extends AclCommandService with Logging {

private def withAuthorizer()(f: Authorizer => Unit): Unit = {
// It is possible that zookeeper.set.acl could be true without SASL if mutual certificate authentication is configured.
// We will default the value of zookeeper.set.acl to true or false based on whether SASL is configured,
// but if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication
// then it will be up to the user to explicitly specify zookeeper.set.acl=true in the authorizer-properties.
val defaultProps = Map(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG -> JaasUtils.isZkSaslEnabled)
val authorizerPropertiesWithoutTls =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt)
defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, false).asScala
} else {
defaultProps
}
val authorizerProperties =
if (opts.options.has(opts.zkTlsConfigFile)) {
// load in TLS configs both with and without the "authorizer." prefix
val validKeys = (ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList ++ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.map("authorizer." + _).toList).asJava
authorizerPropertiesWithoutTls ++ Utils.loadProps(opts.options.valueOf(opts.zkTlsConfigFile), validKeys).asInstanceOf[java.util.Map[String, Any]].asScala
}
else
authorizerPropertiesWithoutTls

val authZ = AuthorizerUtils.createAuthorizer(authorizerClassName)
try {
authZ.configure(authorizerProperties.asJava)
f(authZ)
}
finally Utils.closeQuietly(authZ, "authorizer")
}

def addAcls(): Unit = {
val resourceToAcl = getResourceToAcls(opts)
withAuthorizer() { authorizer =>
for ((resource, acls) <- resourceToAcl) {
println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
val aclBindings = acls.map(acl => new AclBinding(resource, acl))
authorizer.createAcls(null,aclBindings.toList.asJava).asScala.map(_.toCompletableFuture.get).foreach { result =>
result.exception.ifPresent { exception =>
println(s"Error while adding ACLs: ${exception.getMessage}")
println(Utils.stackTrace(exception))
}
}
}
}
}

def removeAcls(): Unit = {
withAuthorizer() { authorizer =>
val filterToAcl = getResourceFilterToAcls(opts)

for ((filter, acls) <- filterToAcl) {
if (acls.isEmpty) {
if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)"))
removeAcls(authorizer, acls, filter)
} else {
if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
removeAcls(authorizer, acls, filter)
}
}
}
}

def listAcls(): Unit = {
withAuthorizer() { authorizer =>
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
val resourceToAcls = getAcls(authorizer, filters)

if (listPrincipals.isEmpty) {
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
} else {
listPrincipals.foreach(principal => {
println(s"ACLs for principal `$principal`")
val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
resource -> acls.filter(acl => principal.toString.equals(acl.principal))
}.filter { case (_, acls) => acls.nonEmpty }

for ((resource, acls) <- filteredResourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
})
}
}
}

private def removeAcls(authorizer: Authorizer, acls: Set[AccessControlEntry], filter: ResourcePatternFilter): Unit = {
val result = if (acls.isEmpty)
authorizer.deleteAcls(null, List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava)
else {
val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, acl.toFilter)).toList.asJava
authorizer.deleteAcls(null, aclBindingFilters)
}
result.asScala.map(_.toCompletableFuture.get).foreach { result =>
result.exception.ifPresent { exception =>
println(s"Error while removing ACLs: ${exception.getMessage}")
println(Utils.stackTrace(exception))
}
result.aclBindingDeleteResults.forEach { deleteResult =>
deleteResult.exception.ifPresent { exception =>
println(s"Error while removing ACLs: ${exception.getMessage}")
println(Utils.stackTrace(exception))
}
}
}
}

private def getAcls(authorizer: Authorizer, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
val aclBindings =
if (filters.isEmpty) authorizer.acls(AclBindingFilter.ANY).asScala
else {
val results = for (filter <- filters) yield {
authorizer.acls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asScala
}
results.reduceLeft(_ ++ _)
}

val resourceToAcls = mutable.Map[ResourcePattern, Set[AccessControlEntry]]().withDefaultValue(Set())

aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
resourceToAcls.toMap
}
}

private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern, Set[AccessControlEntry]] = {
val patternType = opts.options.valueOf(opts.resourcePatternType)
if (!patternType.isSpecific)
Expand Down Expand Up @@ -509,21 +360,6 @@ object AclCommand extends Logging {
.describedAs("command-config")
.ofType(classOf[String])

val authorizerOpt: OptionSpec[String] = parser.accepts("authorizer", "DEPRECATED: Fully qualified class name of " +
"the authorizer, which defaults to kafka.security.authorizer.AclAuthorizer if --bootstrap-server is not provided. " +
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg
.describedAs("authorizer")
.ofType(classOf[String])

val authorizerPropertiesOpt: OptionSpec[String] = parser.accepts("authorizer-properties", "DEPRECATED: The " +
"properties required to configure an instance of the Authorizer specified by --authorizer. " +
"These are key=val pairs. For the default authorizer, example values are: zookeeper.connect=localhost:2181. " +
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg
.describedAs("authorizer-properties")
.ofType(classOf[String])

val topicOpt: OptionSpec[String] = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all topics.")
.withRequiredArg
Expand Down Expand Up @@ -618,16 +454,6 @@ object AclCommand extends Logging {

val forceOpt: OptionSpecBuilder = parser.accepts("force", "Assume Yes to all queries and do not prompt.")

val zkTlsConfigFile: OptionSpec[String] = parser.accepts("zk-tls-config-file",
"DEPRECATED: Identifies the file where ZooKeeper client TLS connectivity properties are defined for" +
" the default authorizer kafka.security.authorizer.AclAuthorizer." +
" Any properties other than the following (with or without an \"authorizer.\" prefix) are ignored: " +
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(", ") +
". Note that if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication being used" +
" then it is necessary to explicitly specify --authorizer-properties zookeeper.set.acl=true. " +
AclCommand.AuthorizerDeprecationMessage)
.withRequiredArg().describedAs("Authorizer ZooKeeper TLS configuration").ofType(classOf[String])

val userPrincipalOpt: OptionSpec[String] = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
"one could grant CreateTokens or DescribeTokens permission on a given user principal.")
.withRequiredArg()
Expand All @@ -640,20 +466,8 @@ object AclCommand extends Logging {
if (options.has(bootstrapServerOpt) && options.has(bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --bootstrap-controller must be specified")

val hasServerOrController = options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt)
if (hasServerOrController && options.has(authorizerOpt))
CommandLineUtils.printUsageAndExit(parser, "The --authorizer option can only be used without --bootstrap-server or --bootstrap-controller")

if (!hasServerOrController) {
CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
System.err.println(AclCommand.AuthorizerDeprecationMessage)
}

if (options.has(commandConfigOpt) && (!hasServerOrController))
CommandLineUtils.printUsageAndExit(parser, "The --command-config option can only be used with --bootstrap-server or --bootstrap-controller option")

if (options.has(authorizerPropertiesOpt) && hasServerOrController)
CommandLineUtils.printUsageAndExit(parser, "The --authorizer-properties option can only be used with --authorizer option")
if (!options.has(bootstrapServerOpt) && !options.has(bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server or --bootstrap-controller must be specified")

val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
if (actions != 1)
Expand Down
43 changes: 3 additions & 40 deletions core/src/test/java/kafka/admin/AclCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,8 @@
)
@ExtendWith(ClusterTestExtensions.class)
public class AclCommandTest {
public static final String ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer";
public static final String STANDARD_AUTHORIZER = "org.apache.kafka.metadata.authorizer.StandardAuthorizer";
private static final String LOCALHOST = "localhost:9092";
private static final String AUTHORIZER = "--authorizer";
private static final String AUTHORIZER_PROPERTIES = AUTHORIZER + "-properties";
private static final String ADD = "--add";
private static final String BOOTSTRAP_SERVER = "--bootstrap-server";
private static final String BOOTSTRAP_CONTROLLER = "--bootstrap-controller";
Expand All @@ -122,7 +119,6 @@ public class AclCommandTest {
private static final String OPERATION = "--operation";
private static final String TOPIC = "--topic";
private static final String RESOURCE_PATTERN_TYPE = "--resource-pattern-type";
private static final String ZOOKEEPER_CONNECT = "zookeeper.connect=localhost:2181";
private static final KafkaPrincipal PRINCIPAL = SecurityUtils.parseKafkaPrincipal("User:test2");
private static final Set<KafkaPrincipal> USERS = new HashSet<>(Arrays.asList(
SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
Expand Down Expand Up @@ -305,43 +301,10 @@ public void testUseBootstrapServerOptWithBootstrapControllerOpt() {
}

@Test
public void testUseBootstrapServerOptWithAuthorizerOpt() {
public void testUseWithoutBootstrapServerOptAndBootstrapControllerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, AUTHORIZER, ACL_AUTHORIZER),
"The --authorizer option can only be used without --bootstrap-server or --bootstrap-controller"
);
}

@Test
public void testUseBootstrapControllerOptWithAuthorizerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_CONTROLLER, LOCALHOST, AUTHORIZER, ACL_AUTHORIZER),
"The --authorizer option can only be used without --bootstrap-server or --bootstrap-controller"
);
}

@Test
public void testRequiredArgsForAuthorizerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(AUTHORIZER, ACL_AUTHORIZER),
"Missing required argument \"[authorizer-properties]\""
);
checkNotThrow(Arrays.asList(AUTHORIZER, ACL_AUTHORIZER, AUTHORIZER_PROPERTIES, ZOOKEEPER_CONNECT, LIST));
}

@Test
public void testUseCommandConfigOptWithoutBootstrapServerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(COMMAND_CONFIG, "cfg.properties", AUTHORIZER, ACL_AUTHORIZER, AUTHORIZER_PROPERTIES, ZOOKEEPER_CONNECT),
"The --command-config option can only be used with --bootstrap-server or --bootstrap-controller option"
);
}

@Test
public void testUseAuthorizerPropertiesOptWithBootstrapServerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, AUTHORIZER_PROPERTIES, ZOOKEEPER_CONNECT),
"The --authorizer-properties option can only be used with --authorizer option"
Collections.emptyList(),
"One of --bootstrap-server or --bootstrap-controller must be specified"
);
}

Expand Down
29 changes: 2 additions & 27 deletions docs/security.html
Original file line number Diff line number Diff line change
Expand Up @@ -1374,13 +1374,13 @@ <h4 class="anchor-heading"><a id="security_authz_cli" class="anchor-link"></a><a
</tr>
<tr>
<td>--bootstrap-server</td>
<td>A list of host/port pairs to use for establishing the connection to the Kafka cluster broker. Only one of --bootstrap-server, --bootstrap-controller, or --authorizer option must be specified.</td>
<td>A list of host/port pairs to use for establishing the connection to the Kafka cluster broker. Only one of --bootstrap-server or --bootstrap-controller option must be specified.</td>
<td></td>
<td>Configuration</td>
</tr>
<tr>
<td>--bootstrap-controller</td>
<td>A list of host/port pairs to use for establishing the connection to the Kafka cluster controller. Only one of --bootstrap-server, --bootstrap-controller, or --authorizer option must be specified.</td>
<td>A list of host/port pairs to use for establishing the connection to the Kafka cluster controller. Only one of --bootstrap-server or --bootstrap-controller option must be specified.</td>
<td></td>
<td>Configuration</td>
</tr>
Expand Down Expand Up @@ -1520,31 +1520,6 @@ <h4 class="anchor-heading"><a id="security_authz_cli" class="anchor-link"></a><a
<td></td>
<td>Convenience</td>
</tr>
<tr>
<td>--authorizer</td>
<td>(DEPRECATED: not supported in KRaft) Fully qualified class name of the authorizer.</td>
<td>kafka.security.authorizer.AclAuthorizer</td>
<td>Configuration</td>
</tr>
<tr>
<td>--authorizer-properties</td>
<td>(DEPRECATED: not supported in KRaft) key=val pairs that will be passed to authorizer for initialization. For the default authorizer in ZK clsuters, the example values are: zookeeper.connect=localhost:2181</td>
<td></td>
<td>Configuration</td>
</tr>
<tr>
<td>--zk-tls-config-file</td>
<td>(DEPRECATED: not supported in KRaft) Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined.
Any properties other than the following (with or without an "authorizer." prefix) are ignored:
zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable,
zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm,
zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type,
zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location,
zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type
</td>
<td></td>
<td>Configuration</td>
</tr>
</table>

<h4 class="anchor-heading"><a id="security_authz_examples" class="anchor-link"></a><a href="#security_authz_examples">Examples</a></h4>
Expand Down
Loading

0 comments on commit 8db86c6

Please sign in to comment.