diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 716510fce3578..1f08ca951269b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2573,7 +2573,7 @@ public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAc final long now = time.milliseconds(); final KafkaFutureImpl> future = new KafkaFutureImpl<>(); runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedBrokerOrActiveKController()) { @Override DescribeAclsRequest.Builder createRequest(int timeoutMs) { @@ -2620,7 +2620,7 @@ public CreateAclsResult createAcls(Collection acls, CreateAclsOption } final CreateAclsRequestData data = new CreateAclsRequestData().setCreations(aclCreations); runnable.call(new Call("createAcls", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedBrokerOrActiveKController()) { @Override CreateAclsRequest.Builder createRequest(int timeoutMs) { @@ -2672,7 +2672,7 @@ public DeleteAclsResult deleteAcls(Collection filters, DeleteA } final DeleteAclsRequestData data = new DeleteAclsRequestData().setFilters(deleteAclsFilters); runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { + new LeastLoadedBrokerOrActiveKController()) { @Override DeleteAclsRequest.Builder createRequest(int timeoutMs) { diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index ccc2450594eaf..2eb68ffa36e1a 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -43,7 +43,7 @@ object AclCommand extends Logging { private val AuthorizerDeprecationMessage: String = "Warning: support for ACL configuration directly " + "through the authorizer is deprecated and will be removed in a future release. Please use " + - "--bootstrap-server instead to set ACLs through the admin client." + "--bootstrap-server or --bootstrap-controller instead to set ACLs through the admin client." private val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL) private val Newline = scala.util.Properties.lineSeparator @@ -57,7 +57,7 @@ object AclCommand extends Logging { opts.checkArgs() val aclCommandService = { - if (opts.options.has(opts.bootstrapServerOpt)) { + if (opts.options.has(opts.bootstrapServerOpt) || opts.options.has(opts.bootstrapControllerOpt)) { new AdminClientService(opts) } else { val authorizerClassName = if (opts.options.has(opts.authorizerOpt)) @@ -97,7 +97,12 @@ object AclCommand extends Logging { Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties() - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) + + if (opts.options.has(opts.bootstrapServerOpt)) { + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) + } else { + props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt)) + } val adminClient = Admin.create(props) try { @@ -493,6 +498,12 @@ object AclCommand extends Logging { .describedAs("server to connect to") .ofType(classOf[String]) + val bootstrapControllerOpt: OptionSpec[String] = parser.accepts("bootstrap-controller", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." + + " This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.") + .withRequiredArg + .describedAs("controller to connect to") + .ofType(classOf[String]) + val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc) .withOptionalArg() .describedAs("command-config") @@ -626,18 +637,22 @@ object AclCommand extends Logging { options = parser.parse(args: _*) def checkArgs(): Unit = { - if (options.has(bootstrapServerOpt) && options.has(authorizerOpt)) - CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --authorizer must be specified") + if (options.has(bootstrapServerOpt) && options.has(bootstrapControllerOpt)) + CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --bootstrap-controller must be specified") + + val hasServerOrController = options.has(bootstrapServerOpt) || options.has(bootstrapControllerOpt) + if (hasServerOrController && options.has(authorizerOpt)) + CommandLineUtils.printUsageAndExit(parser, "The --authorizer option can only be used without --bootstrap-server or --bootstrap-controller") - if (!options.has(bootstrapServerOpt)) { + if (!hasServerOrController) { CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt) System.err.println(AclCommand.AuthorizerDeprecationMessage) } - if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt)) - CommandLineUtils.printUsageAndExit(parser, "The --command-config option can only be used with --bootstrap-server option") + if (options.has(commandConfigOpt) && (!hasServerOrController)) + CommandLineUtils.printUsageAndExit(parser, "The --command-config option can only be used with --bootstrap-server or --bootstrap-controller option") - if (options.has(authorizerPropertiesOpt) && options.has(bootstrapServerOpt)) + if (options.has(authorizerPropertiesOpt) && hasServerOrController) CommandLineUtils.printUsageAndExit(parser, "The --authorizer-properties option can only be used with --authorizer option") val actions = Seq(addOpt, removeOpt, listOpt).count(options.has) diff --git a/core/src/test/java/kafka/admin/AclCommandTest.java b/core/src/test/java/kafka/admin/AclCommandTest.java index cd40e1a0e6fc2..97d643f2c251c 100644 --- a/core/src/test/java/kafka/admin/AclCommandTest.java +++ b/core/src/test/java/kafka/admin/AclCommandTest.java @@ -26,9 +26,9 @@ import kafka.test.annotation.Type; import kafka.test.junit.ClusterTestExtensions; import kafka.test.junit.ZkClusterInvocationContext; -import kafka.utils.TestUtils; import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.resource.PatternType; @@ -41,7 +41,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.SecurityUtils; import org.apache.kafka.metadata.authorizer.StandardAuthorizer; -import org.apache.kafka.server.authorizer.Authorizer; +import org.apache.kafka.test.TestUtils; import org.apache.log4j.Level; import org.junit.jupiter.api.Test; @@ -49,6 +49,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; import java.io.PrintStream; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; @@ -61,7 +62,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.stream.Collectors; import javax.management.InstanceAlreadyExistsException; @@ -111,6 +111,7 @@ public class AclCommandTest { private static final String AUTHORIZER_PROPERTIES = AUTHORIZER + "-properties"; private static final String ADD = "--add"; private static final String BOOTSTRAP_SERVER = "--bootstrap-server"; + private static final String BOOTSTRAP_CONTROLLER = "--bootstrap-controller"; private static final String COMMAND_CONFIG = "--command-config"; private static final String CONSUMER = "--consumer"; private static final String IDEMPOTENT = "--idempotent"; @@ -221,7 +222,7 @@ public class AclCommandTest { }}; @ClusterTest(types = {Type.ZK}) - public void testAclCliWithAuthorizer(ClusterInstance cluster) { + public void testAclCliWithAuthorizer(ClusterInstance cluster) throws InterruptedException { testAclCli(cluster, zkArgs(cluster)); } @@ -231,12 +232,40 @@ public void testAclCliWithAuthorizer(ClusterInstance cluster) { @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) }) }) - public void testAclCliWithAdminAPI(ClusterInstance cluster) { + public void testAclCliWithAdminAPI(ClusterInstance cluster) throws InterruptedException { testAclCli(cluster, adminArgs(cluster.bootstrapServers(), Optional.empty())); } + + @ClusterTest(types = {Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) + }) + public void testAclCliWithAdminAPIAndBootstrapController(ClusterInstance cluster) throws InterruptedException { + testAclCli(cluster, adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.empty())); + } + + @ClusterTests({ + @ClusterTest(types = {Type.ZK}), + @ClusterTest(types = {Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) + }) + }) + public void testAclCliWithMisusingBootstrapServerToController(ClusterInstance cluster) { + assertThrows(RuntimeException.class, () -> testAclCli(cluster, adminArgsWithBootstrapController(cluster.bootstrapServers(), Optional.empty()))); + } + + @ClusterTests({ + @ClusterTest(types = {Type.ZK}), + @ClusterTest(types = {Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) + }) + }) + public void testAclCliWithMisusingBootstrapControllerToServer(ClusterInstance cluster) { + assertThrows(RuntimeException.class, () -> testAclCli(cluster, adminArgs(cluster.bootstrapControllers(), Optional.empty()))); + } + @ClusterTest(types = {Type.ZK}) - public void testProducerConsumerCliWithAuthorizer(ClusterInstance cluster) { + public void testProducerConsumerCliWithAuthorizer(ClusterInstance cluster) throws InterruptedException { testProducerConsumerCli(cluster, zkArgs(cluster)); } @@ -246,32 +275,50 @@ public void testProducerConsumerCliWithAuthorizer(ClusterInstance cluster) { @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) }) }) - public void testProducerConsumerCliWithAdminAPI(ClusterInstance cluster) { + public void testProducerConsumerCliWithAdminAPI(ClusterInstance cluster) throws InterruptedException { testProducerConsumerCli(cluster, adminArgs(cluster.bootstrapServers(), Optional.empty())); } + @ClusterTest(types = {Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) + }) + public void testProducerConsumerCliWithAdminAPIAndBootstrapController(ClusterInstance cluster) throws InterruptedException { + testProducerConsumerCli(cluster, adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.empty())); + } + @ClusterTests({ @ClusterTest(types = {Type.ZK}), @ClusterTest(types = {Type.KRAFT}, serverProperties = { @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) }) }) - public void testAclCliWithClientId(ClusterInstance cluster) { - LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - appender.setClassLogger(AppInfoParser.class, Level.WARN); - try { + public void testAclCliWithClientId(ClusterInstance cluster) throws IOException, InterruptedException { + try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + appender.setClassLogger(AppInfoParser.class, Level.WARN); testAclCli(cluster, adminArgs(cluster.bootstrapServers(), Optional.of(TestUtils.tempFile("client.id=my-client")))); - } finally { - appender.close(); + assertEquals(0, appender.getEvents().stream() + .filter(e -> e.getLevel().equals(Level.WARN.toString())) + .filter(e -> e.getThrowableClassName().filter(name -> name.equals(InstanceAlreadyExistsException.class.getName())).isPresent()) + .count(), "There should be no warnings about multiple registration of mbeans"); + } + } + + @ClusterTest(types = {Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) + }) + public void testAclCliWithClientIdAndBootstrapController(ClusterInstance cluster) throws IOException, InterruptedException { + try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + appender.setClassLogger(AppInfoParser.class, Level.WARN); + testAclCli(cluster, adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.of(TestUtils.tempFile("client.id=my-client")))); + assertEquals(0, appender.getEvents().stream() + .filter(e -> e.getLevel().equals(Level.WARN.toString())) + .filter(e -> e.getThrowableClassName().filter(name -> name.equals(InstanceAlreadyExistsException.class.getName())).isPresent()) + .count(), "There should be no warnings about multiple registration of mbeans"); } - assertEquals(0, appender.getEvents().stream() - .filter(e -> e.getLevel().equals(Level.WARN.toString())) - .filter(e -> e.getThrowableClassName().filter(name -> name.equals(InstanceAlreadyExistsException.class.getName())).isPresent()) - .count(), "There should be no warnings about multiple registration of mbeans"); } @ClusterTest(types = {Type.ZK}) - public void testAclsOnPrefixedResourcesWithAuthorizer(ClusterInstance cluster) { + public void testAclsOnPrefixedResourcesWithAuthorizer(ClusterInstance cluster) throws InterruptedException { testAclsOnPrefixedResources(cluster, zkArgs(cluster)); } @@ -281,10 +328,17 @@ public void testAclsOnPrefixedResourcesWithAuthorizer(ClusterInstance cluster) { @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) }) }) - public void testAclsOnPrefixedResourcesWithAdminAPI(ClusterInstance cluster) { + public void testAclsOnPrefixedResourcesWithAdminAPI(ClusterInstance cluster) throws InterruptedException { testAclsOnPrefixedResources(cluster, adminArgs(cluster.bootstrapServers(), Optional.empty())); } + @ClusterTest(types = {Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) + }) + public void testAclsOnPrefixedResourcesWithAdminAPIAndBootstrapController(ClusterInstance cluster) throws InterruptedException { + testAclsOnPrefixedResources(cluster, adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.empty())); + } + @ClusterTest(types = {Type.ZK}) public void testInvalidAuthorizerProperty(ClusterInstance cluster) { AclCommand.AuthorizerService aclCommandService = new AclCommand.AuthorizerService( @@ -309,11 +363,36 @@ public void testPatternTypesWithAdminAPI(ClusterInstance cluster) { testPatternTypes(adminArgs(cluster.bootstrapServers(), Optional.empty())); } + @ClusterTests({ + @ClusterTest(types = {Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = STANDARD_AUTHORIZER) + }) + }) + public void testPatternTypesWithAdminAPIAndBootstrapController(ClusterInstance cluster) { + testPatternTypes(adminArgsWithBootstrapController(cluster.bootstrapControllers(), Optional.empty())); + } + + @Test + public void testUseBootstrapServerOptWithBootstrapControllerOpt() { + assertInitializeInvalidOptionsExitCodeAndMsg( + Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, BOOTSTRAP_CONTROLLER, LOCALHOST), + "Only one of --bootstrap-server or --bootstrap-controller must be specified" + ); + } + @Test public void testUseBootstrapServerOptWithAuthorizerOpt() { assertInitializeInvalidOptionsExitCodeAndMsg( Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, AUTHORIZER, ACL_AUTHORIZER), - "Only one of --bootstrap-server or --authorizer must be specified" + "The --authorizer option can only be used without --bootstrap-server or --bootstrap-controller" + ); + } + + @Test + public void testUseBootstrapControllerOptWithAuthorizerOpt() { + assertInitializeInvalidOptionsExitCodeAndMsg( + Arrays.asList(BOOTSTRAP_CONTROLLER, LOCALHOST, AUTHORIZER, ACL_AUTHORIZER), + "The --authorizer option can only be used without --bootstrap-server or --bootstrap-controller" ); } @@ -330,7 +409,7 @@ public void testRequiredArgsForAuthorizerOpt() { public void testUseCommandConfigOptWithoutBootstrapServerOpt() { assertInitializeInvalidOptionsExitCodeAndMsg( Arrays.asList(COMMAND_CONFIG, "cfg.properties", AUTHORIZER, ACL_AUTHORIZER, AUTHORIZER_PROPERTIES, ZOOKEEPER_CONNECT), - "The --command-config option can only be used with --bootstrap-server option" + "The --command-config option can only be used with --bootstrap-server or --bootstrap-controller option" ); } @@ -398,7 +477,7 @@ public void testInvalidArgs() { ); } - private void testProducerConsumerCli(ClusterInstance cluster, List cmdArgs) { + private void testProducerConsumerCli(ClusterInstance cluster, List cmdArgs) throws InterruptedException { for (Map.Entry, Map, Set>> entry : CMD_TO_RESOURCES_TO_ACL.entrySet()) { List cmd = entry.getKey(); Map, Set> resourcesToAcls = entry.getValue(); @@ -418,8 +497,7 @@ private void testProducerConsumerCli(ClusterInstance cluster, List cmdAr for (Map.Entry, Set> resourcesToAclsEntry : resourcesToAcls.entrySet()) { for (ResourcePattern resource : resourcesToAclsEntry.getKey()) { - withAuthorizer(cluster, authorizer -> - TestUtils.waitAndVerifyAcls(asScalaSet(resourcesToAclsEntry.getValue()), authorizer, resource, ANY)); + cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), resourcesToAclsEntry.getValue()); } } List resourceCmd = new ArrayList<>(resourceCommand); @@ -428,7 +506,7 @@ private void testProducerConsumerCli(ClusterInstance cluster, List cmdAr } } - private void testAclsOnPrefixedResources(ClusterInstance cluster, List cmdArgs) { + private void testAclsOnPrefixedResources(ClusterInstance cluster, List cmdArgs) throws InterruptedException { List cmd = Arrays.asList("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test-", RESOURCE_PATTERN_TYPE, "Prefixed"); @@ -437,17 +515,11 @@ private void testAclsOnPrefixedResources(ClusterInstance cluster, List c args.add(ADD); callMain(args); - withAuthorizer(cluster, authorizer -> { - AccessControlEntry writeAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, WRITE, ALLOW); - AccessControlEntry describeAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, DESCRIBE, ALLOW); - AccessControlEntry createAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, CREATE, ALLOW); - TestUtils.waitAndVerifyAcls( - asScalaSet(new HashSet<>(Arrays.asList(writeAcl, describeAcl, createAcl))), - authorizer, - new ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED), - ANY - ); - }); + AccessControlEntry writeAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, WRITE, ALLOW); + AccessControlEntry describeAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, DESCRIBE, ALLOW); + AccessControlEntry createAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, CREATE, ALLOW); + cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY), + Arrays.asList(writeAcl, describeAcl, createAcl)); args = new ArrayList<>(cmdArgs); args.addAll(cmd); @@ -455,20 +527,10 @@ private void testAclsOnPrefixedResources(ClusterInstance cluster, List c args.add("--force"); callMain(args); - withAuthorizer(cluster, authorizer -> { - TestUtils.waitAndVerifyAcls( - asScalaSet(Collections.emptySet()), - authorizer, - new ResourcePattern(CLUSTER, "kafka-cluster", LITERAL), - ANY - ); - TestUtils.waitAndVerifyAcls( - asScalaSet(Collections.emptySet()), - authorizer, - new ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED), - ANY - ); - }); + cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PREFIXED).toFilter(), ANY), + Collections.emptySet()); + cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY), + Collections.emptySet()); } private static Map, Set> producerResourceToAcls(boolean enableIdempotence) { @@ -490,11 +552,17 @@ private List adminArgs(String bootstrapServer, Optional commandCon return adminArgs; } + private List adminArgsWithBootstrapController(String bootstrapController, Optional commandConfig) { + List adminArgs = new ArrayList<>(Arrays.asList(BOOTSTRAP_CONTROLLER, bootstrapController)); + commandConfig.ifPresent(file -> adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath()))); + return adminArgs; + } + private Map.Entry callMain(List args) { return grabConsoleOutputAndError(() -> AclCommand.main(args.toArray(new String[0]))); } - private void testAclCli(ClusterInstance cluster, List cmdArgs) { + private void testAclCli(ClusterInstance cluster, List cmdArgs) throws InterruptedException { for (Map.Entry, List> entry : RESOURCE_TO_COMMAND.entrySet()) { Set resources = entry.getKey(); List resourceCmd = entry.getValue(); @@ -514,8 +582,7 @@ private void testAclCli(ClusterInstance cluster, List cmdArgs) { assertEquals("", out.getValue()); for (ResourcePattern resource : resources) { - withAuthorizer(cluster, authorizer -> - TestUtils.waitAndVerifyAcls(asScalaSet(aclToCommand.getKey()), authorizer, resource, ANY)); + cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), aclToCommand.getKey()); } resultArgs = new ArrayList<>(cmdArgs); @@ -592,7 +659,7 @@ private void testRemove( List cmdArgs, Set resources, List resourceCmd - ) { + ) throws InterruptedException { List args = new ArrayList<>(cmdArgs); args.addAll(resourceCmd); args.add(REMOVE); @@ -600,8 +667,7 @@ private void testRemove( Map.Entry out = callMain(args); assertEquals("", out.getValue()); for (ResourcePattern resource : resources) { - withAuthorizer(cluster, authorizer -> - TestUtils.waitAndVerifyAcls(asScalaSet(Collections.emptySet()), authorizer, resource, ANY)); + cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), Collections.emptySet()); } } @@ -628,23 +694,6 @@ private List getCmd(AclPermissionType permissionType) { return fullCmd; } - private void withAuthorizer(ClusterInstance cluster, Consumer consumer) { - if (cluster.isKRaftTest()) { - List allAuthorizers = new ArrayList<>(); - allAuthorizers.addAll(cluster.brokers().values().stream() - .map(server -> server.authorizer().get()).collect(Collectors.toList())); - allAuthorizers.addAll(cluster.controllers().values().stream() - .map(server -> server.authorizer().get()).collect(Collectors.toList())); - allAuthorizers.forEach(consumer); - } else { - consumer.accept(cluster.brokers().values().stream() - .findFirst() - .orElseThrow(() -> new RuntimeException("No broker found")) - .authorizer().get() - ); - } - } - /** * Capture both the console output and console error during the execution of the provided function. */ diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index 527d10afda575..fe32c76001a2e 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -27,15 +27,22 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.test.TestUtils; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; @@ -207,4 +214,28 @@ default void waitForTopic(String topic, int partitions) throws InterruptedExcept 60000L, "Timeout waiting for controller metadata propagating to brokers"); } } + + default List authorizers() { + List authorizers = new ArrayList<>(); + authorizers.addAll(brokers().values().stream() + .filter(server -> server.authorizer().isDefined()) + .map(server -> server.authorizer().get()).collect(Collectors.toList())); + authorizers.addAll(controllers().values().stream() + .filter(server -> server.authorizer().isDefined()) + .map(server -> server.authorizer().get()).collect(Collectors.toList())); + return authorizers; + } + + default void waitAcls(AclBindingFilter filter, Collection entries) throws InterruptedException { + for (Authorizer authorizer : authorizers()) { + AtomicReference> actualEntries = new AtomicReference<>(new HashSet<>()); + TestUtils.waitForCondition(() -> { + Set accessControlEntrySet = new HashSet<>(); + authorizer.acls(filter).forEach(aclBinding -> accessControlEntrySet.add(aclBinding.entry())); + actualEntries.set(accessControlEntrySet); + return accessControlEntrySet.containsAll(entries) && entries.containsAll(accessControlEntrySet); + }, () -> "expected acls: " + entries + ", actual acls: " + actualEntries.get()); + } + } + } diff --git a/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java b/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java index fbc1c45c5793e..917659b8e23a0 100644 --- a/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java +++ b/core/src/test/java/kafka/test/server/BootstrapControllersIntegrationTest.java @@ -18,6 +18,7 @@ package kafka.test.server; import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; import kafka.test.annotation.ClusterTest; import kafka.test.annotation.ClusterTestDefaults; import kafka.test.annotation.Type; @@ -43,11 +44,21 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidUpdateVersionException; import org.apache.kafka.common.errors.MismatchedEndpointTypeException; import org.apache.kafka.common.errors.UnsupportedEndpointTypeException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.metadata.authorizer.StandardAuthorizer; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.test.TestUtils; @@ -70,6 +81,8 @@ import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; +import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -297,4 +310,40 @@ private static List> translatePartitionInfoToNodeIdList(List partition.replicas().stream().map(Node::id).collect(Collectors.toList())) .collect(Collectors.toList()); } + + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"), + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer") + }) + public void testAclsByControllers(ClusterInstance clusterInstance) throws Exception { + testAcls(clusterInstance, true); + } + + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"), + @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer") + }) + public void testAcls(ClusterInstance clusterInstance) throws Exception { + testAcls(clusterInstance, false); + } + + private void testAcls(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception { + try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) { + ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL); + AccessControlEntry accessControlEntry = new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW); + AclBinding aclBinding = new AclBinding(resourcePattern, accessControlEntry); + assertDoesNotThrow(() -> admin.createAcls(Collections.singleton(aclBinding)).all().get(1, TimeUnit.MINUTES)); + + clusterInstance.waitAcls(new AclBindingFilter(resourcePattern.toFilter(), AccessControlEntryFilter.ANY), + Collections.singleton(accessControlEntry)); + + Collection aclBindings = admin.describeAcls(AclBindingFilter.ANY).values().get(1, TimeUnit.MINUTES); + assertEquals(1, aclBindings.size()); + assertEquals(aclBinding, aclBindings.iterator().next()); + + Collection deletedAclBindings = admin.deleteAcls(Collections.singleton(AclBindingFilter.ANY)).all().get(1, TimeUnit.MINUTES); + assertEquals(1, deletedAclBindings.size()); + assertEquals(aclBinding, deletedAclBindings.iterator().next()); + } + } } diff --git a/docs/security.html b/docs/security.html index 6f3f81204a93e..5360f5ce2b85c 100644 --- a/docs/security.html +++ b/docs/security.html @@ -1374,7 +1374,13 @@

--bootstrap-server - A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. + A list of host/port pairs to use for establishing the connection to the Kafka cluster broker. Only one of --bootstrap-server, --bootstrap-controller, or --authorizer option must be specified. + + Configuration + + + --bootstrap-controller + A list of host/port pairs to use for establishing the connection to the Kafka cluster controller. Only one of --bootstrap-server, --bootstrap-controller, or --authorizer option must be specified. Configuration