diff --git a/checkstyle/checkstyle-suppressions.xml b/checkstyle/checkstyle-suppressions.xml
index 47b07bb..69103f8 100644
--- a/checkstyle/checkstyle-suppressions.xml
+++ b/checkstyle/checkstyle-suppressions.xml
@@ -17,4 +17,7 @@
+
+
+
diff --git a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java
index a4076d1..6082274 100644
--- a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java
+++ b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java
@@ -30,6 +30,7 @@ public final class AivenAclAuthorizerConfig extends AbstractConfig {
private static final String AUDITOR_CLASS_NAME_CONF = "aiven.acl.authorizer.auditor.class.name";
private static final String LOG_DENIALS_CONF = "aiven.acl.authorizer.log.denials";
private static final String CONFIG_REFRESH_CONF = "aiven.acl.authorizer.config.refresh.interval";
+ private static final String LIST_ACLS_ENABLED_CONF = "aiven.acl.authorizer.list.acls.enabled";
public AivenAclAuthorizerConfig(final Map, ?> originals) {
super(configDef(), originals);
@@ -61,6 +62,12 @@ public static ConfigDef configDef() {
10_000,
ConfigDef.Importance.LOW,
"The interval between ACL reloads"
+ ).define(
+ LIST_ACLS_ENABLED_CONF,
+ ConfigDef.Type.BOOLEAN,
+ true,
+ ConfigDef.Importance.LOW,
+ "Whether to allow listing ACLs"
);
}
@@ -79,4 +86,8 @@ public boolean logDenials() {
public int configRefreshInterval() {
return getInt(CONFIG_REFRESH_CONF);
}
+
+ public boolean listAclsEnabled() {
+ return getBoolean(LIST_ACLS_ENABLED_CONF);
+ }
}
diff --git a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java
index 16b3c53..63b0188 100644
--- a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java
+++ b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java
@@ -57,6 +57,7 @@
import io.aiven.kafka.auth.json.reader.JsonReaderException;
import io.aiven.kafka.auth.nameformatters.LegacyOperationNameFormatter;
import io.aiven.kafka.auth.nameformatters.LegacyResourceTypeNameFormatter;
+import io.aiven.kafka.auth.nativeacls.AclAivenToNativeConverter;
import kafka.network.RequestChannel.Session;
import org.slf4j.Logger;
@@ -228,7 +229,7 @@ private void logAuthVerdict(final boolean verdict,
public final List extends CompletionStage> createAcls(
final AuthorizableRequestContext requestContext,
final List aclBindings) {
- LOGGER.error("`createAcls` is not implemented");
+ LOGGER.warn("`createAcls` is not implemented");
return List.of();
}
@@ -236,13 +237,20 @@ public final List extends CompletionStage> createAcls(
public final List extends CompletionStage> deleteAcls(
final AuthorizableRequestContext requestContext,
final List aclBindingFilters) {
- LOGGER.error("`deleteAcls` is not implemented");
+ LOGGER.warn("`deleteAcls` is not implemented");
return List.of();
}
@Override
public final Iterable acls(final AclBindingFilter filter) {
- LOGGER.error("`acls` is not implemented");
- return List.of();
+ if (this.config.listAclsEnabled()) {
+ return this.cacheReference.get().aclEntries().stream()
+ .flatMap(acl -> AclAivenToNativeConverter.convert(acl).stream())
+ .filter(filter::matches)
+ .collect(Collectors.toList());
+ } else {
+ LOGGER.warn("Listing ACLs is disabled");
+ return List.of();
+ }
}
}
diff --git a/src/main/java/io/aiven/kafka/auth/VerdictCache.java b/src/main/java/io/aiven/kafka/auth/VerdictCache.java
index fccba09..2d4d956 100644
--- a/src/main/java/io/aiven/kafka/auth/VerdictCache.java
+++ b/src/main/java/io/aiven/kafka/auth/VerdictCache.java
@@ -41,7 +41,6 @@ public boolean get(final KafkaPrincipal principal,
+ "|" + operation
+ "|" + principal.getName()
+ "|" + principal.getPrincipalType();
-
final Predicate matcher = aclEntry ->
aclEntry.check(principal.getPrincipalType(), principal.getName(), operation, resource);
return cache.computeIfAbsent(cacheKey, key -> aclEntries.stream().anyMatch(matcher));
@@ -50,6 +49,10 @@ public boolean get(final KafkaPrincipal principal,
}
}
+ public List aclEntries() {
+ return List.copyOf(aclEntries);
+ }
+
public static VerdictCache create(final List aclEntries) {
return new VerdictCache(aclEntries);
}
diff --git a/src/main/java/io/aiven/kafka/auth/json/AivenAcl.java b/src/main/java/io/aiven/kafka/auth/json/AivenAcl.java
index f636216..0b0af88 100644
--- a/src/main/java/io/aiven/kafka/auth/json/AivenAcl.java
+++ b/src/main/java/io/aiven/kafka/auth/json/AivenAcl.java
@@ -22,21 +22,21 @@
import com.google.gson.annotations.SerializedName;
-public class AivenAcl {
+public class AivenAcl {
@SerializedName("principal_type")
- private final String principalType;
+ public final String principalType;
@SerializedName("principal")
- private final Pattern principalRe;
+ public final Pattern principalRe;
@SerializedName("operation")
- private final Pattern operationRe;
+ public final Pattern operationRe;
@SerializedName("resource")
- private final Pattern resourceRe;
+ public final Pattern resourceRe;
@SerializedName("resource_pattern")
- private final String resourceRePattern;
+ public final String resourceRePattern;
public AivenAcl(final String principalType,
final String principal,
diff --git a/src/main/java/io/aiven/kafka/auth/nameformatters/OperationNameFormatter.java b/src/main/java/io/aiven/kafka/auth/nameformatters/OperationNameFormatter.java
new file mode 100644
index 0000000..7cc4fe6
--- /dev/null
+++ b/src/main/java/io/aiven/kafka/auth/nameformatters/OperationNameFormatter.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nameformatters;
+
+import org.apache.kafka.common.acl.AclOperation;
+
+public class OperationNameFormatter {
+ public static AclOperation format(final String operation) {
+ switch (operation) {
+ case "Unknown":
+ return AclOperation.UNKNOWN;
+
+ case "Any":
+ return AclOperation.ANY;
+
+ case "All":
+ return AclOperation.ALL;
+
+ case "Read":
+ return AclOperation.READ;
+
+ case "Write":
+ return AclOperation.WRITE;
+
+ case "Create":
+ return AclOperation.CREATE;
+
+ case "Delete":
+ return AclOperation.DELETE;
+
+ case "Alter":
+ return AclOperation.ALTER;
+
+ case "Describe":
+ return AclOperation.DESCRIBE;
+
+ case "ClusterAction":
+ return AclOperation.CLUSTER_ACTION;
+
+ case "DescribeConfigs":
+ return AclOperation.DESCRIBE_CONFIGS;
+
+ case "AlterConfigs":
+ return AclOperation.ALTER_CONFIGS;
+
+ case "IdempotentWrite":
+ return AclOperation.IDEMPOTENT_WRITE;
+
+ default:
+ return AclOperation.UNKNOWN;
+ }
+ }
+}
diff --git a/src/main/java/io/aiven/kafka/auth/nameformatters/ResourceTypeNameFormatter.java b/src/main/java/io/aiven/kafka/auth/nameformatters/ResourceTypeNameFormatter.java
new file mode 100644
index 0000000..42971e4
--- /dev/null
+++ b/src/main/java/io/aiven/kafka/auth/nameformatters/ResourceTypeNameFormatter.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nameformatters;
+
+import org.apache.kafka.common.resource.ResourceType;
+
+public class ResourceTypeNameFormatter {
+ public static ResourceType format(final String resourceType) {
+ switch (resourceType) {
+ case "Any":
+ return ResourceType.ANY;
+
+ case "Topic":
+ return ResourceType.TOPIC;
+
+ case "Group":
+ return ResourceType.GROUP;
+
+ case "Cluster":
+ return ResourceType.CLUSTER;
+
+ case "TransactionalId":
+ return ResourceType.TRANSACTIONAL_ID;
+
+ case "DelegationToken":
+ return ResourceType.DELEGATION_TOKEN;
+
+ default:
+ return ResourceType.UNKNOWN;
+ }
+ }
+}
diff --git a/src/main/java/io/aiven/kafka/auth/nativeacls/AclAivenToNativeConverter.java b/src/main/java/io/aiven/kafka/auth/nativeacls/AclAivenToNativeConverter.java
new file mode 100644
index 0000000..602461a
--- /dev/null
+++ b/src/main/java/io/aiven/kafka/auth/nativeacls/AclAivenToNativeConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclPermissionType;
+
+import io.aiven.kafka.auth.json.AivenAcl;
+
+public class AclAivenToNativeConverter {
+ public static List convert(final AivenAcl aivenAcl) {
+ final List result = new ArrayList<>();
+ if (aivenAcl.resourceRe == null) {
+ return result;
+ }
+
+ if (!Objects.equals(aivenAcl.principalType, "User")) {
+ return result;
+ }
+
+ for (final var operation : AclOperationsParser.parse(aivenAcl.operationRe.pattern())) {
+ final List principals = AclPrincipalFormatter.parse(
+ aivenAcl.principalType, aivenAcl.principalRe.pattern()
+ );
+ for (final var principal : principals) {
+ final var accessControlEntry = new AccessControlEntry(
+ principal, "*", operation, AclPermissionType.ALLOW);
+ for (final var resourcePattern : ResourcePatternParser.parse(aivenAcl.resourceRe.pattern())) {
+ result.add(new AclBinding(resourcePattern, accessControlEntry));
+ }
+ }
+ }
+
+ return result;
+ }
+}
diff --git a/src/main/java/io/aiven/kafka/auth/nativeacls/AclOperationsParser.java b/src/main/java/io/aiven/kafka/auth/nativeacls/AclOperationsParser.java
new file mode 100644
index 0000000..776ca05
--- /dev/null
+++ b/src/main/java/io/aiven/kafka/auth/nativeacls/AclOperationsParser.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.acl.AclOperation;
+
+import io.aiven.kafka.auth.nameformatters.OperationNameFormatter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AclOperationsParser {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AclOperationsParser.class);
+
+ // Visible for test
+ static Iterable parse(final String operationPattern) {
+ if (operationPattern == null) {
+ return List.of();
+ }
+
+ if (operationPattern.equals("^.*$") || operationPattern.equals("^(.*)$")) {
+ return List.of(
+ AclOperation.READ,
+ AclOperation.WRITE,
+ AclOperation.CREATE,
+ AclOperation.DELETE,
+ AclOperation.ALTER,
+ AclOperation.DESCRIBE,
+ AclOperation.CLUSTER_ACTION,
+ AclOperation.DESCRIBE_CONFIGS,
+ AclOperation.ALTER_CONFIGS,
+ AclOperation.IDEMPOTENT_WRITE
+ );
+ }
+
+ final List parsedOperationList = RegexParser.parse(operationPattern);
+ if (parsedOperationList == null) {
+ LOGGER.debug("Nothing parsed from operation {}", operationPattern);
+ return List.of();
+ }
+ return parsedOperationList.stream()
+ .map(OperationNameFormatter::format)
+ .filter(o -> o != AclOperation.UNKNOWN)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/src/main/java/io/aiven/kafka/auth/nativeacls/AclPrincipalFormatter.java b/src/main/java/io/aiven/kafka/auth/nativeacls/AclPrincipalFormatter.java
new file mode 100644
index 0000000..232f765
--- /dev/null
+++ b/src/main/java/io/aiven/kafka/auth/nativeacls/AclPrincipalFormatter.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2023 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+class AclPrincipalFormatter {
+ // Visible for test
+ static List parse(final String principalType, final String principalPattern) {
+ if (principalType == null || principalPattern == null) {
+ return List.of();
+ }
+ List principals = RegexParser.parse(principalPattern);
+ if (principals == null) {
+ principals = List.of(principalPattern);
+ }
+ if (principals.contains("(.*)") || principals.contains(".*")) {
+ return List.of(principalType + ":*");
+ }
+ return principals.stream()
+ .map(p -> principalType + ":" + p)
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/src/main/java/io/aiven/kafka/auth/nativeacls/RegexParser.java b/src/main/java/io/aiven/kafka/auth/nativeacls/RegexParser.java
new file mode 100644
index 0000000..f0b4452
--- /dev/null
+++ b/src/main/java/io/aiven/kafka/auth/nativeacls/RegexParser.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+class RegexParser {
+
+ private static final Pattern PARSER_PATTERN = Pattern.compile("(?<=^\\^)(.*?)(?=\\$$)");
+
+ // Visible for test
+ /**
+ * Parses a regex pattern into a list.
+ *
+ * For example, ^(AAA|BBB|CCC)$
results in List("AAA", "BBB", "CCC")
,
+ * ^AAA$
results in List("AAA")
. Unparsable strings results in {@code null}.
+ */
+ static List parse(final String pattern) {
+ if (pattern == null) {
+ return null;
+ }
+
+ final Matcher matcher = PARSER_PATTERN.matcher(pattern);
+ if (!matcher.find() || matcher.groupCount() != 1) {
+ return null;
+ }
+
+ String group = matcher.group(0);
+ final int lastChar = group.length() - 1;
+ if (group.charAt(0) == '(' && group.charAt(lastChar) == ')') {
+ group = group.substring(1, lastChar);
+ }
+ return Arrays.stream(group.split("\\|"))
+ .filter(Predicate.not(String::isBlank))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/src/main/java/io/aiven/kafka/auth/nativeacls/ResourcePatternParser.java b/src/main/java/io/aiven/kafka/auth/nativeacls/ResourcePatternParser.java
new file mode 100644
index 0000000..9a6e4d8
--- /dev/null
+++ b/src/main/java/io/aiven/kafka/auth/nativeacls/ResourcePatternParser.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ResourcePatternParser {
+ private static final Pattern PARSER_PATTERN = Pattern.compile("\\^\\(?(.*?)\\)?:\\(?(.*?)\\)?\\$");
+ private static final Logger LOGGER = LoggerFactory.getLogger(ResourcePatternParser.class);
+
+ // Visible for test
+
+ static Iterable parse(final String resourcePattern) {
+ if (resourcePattern == null) {
+ return List.of();
+ }
+
+ if (resourcePattern.equals("^.*$") || resourcePattern.equals("^(.*)$")) {
+ return List.of(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.CLUSTER, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.DELEGATION_TOKEN, "*", PatternType.LITERAL)
+ );
+ }
+
+ final Matcher matcher = PARSER_PATTERN.matcher(resourcePattern);
+ if (!matcher.find() || matcher.groupCount() != 2) {
+ LOGGER.debug("Nothing parsed from resource pattern {}", resourcePattern);
+ return List.of();
+ }
+ final String resourceTypesGroup = matcher.group(1);
+ final String resourcesGroup = matcher.group(2);
+ if (resourceTypesGroup.isBlank() || resourcesGroup.isBlank()) {
+ LOGGER.debug("Parsed empty resource type or resource for {}", resourcePattern);
+ return List.of();
+ }
+ final List resourceTypes = Arrays.stream(resourceTypesGroup.split("\\|"))
+ .flatMap(type -> ResourceTypeParser.parse(type).stream())
+ .collect(Collectors.toList());
+
+ final List resources = Arrays.stream(resourcesGroup.split("\\|"))
+ .filter(Predicate.not(String::isBlank))
+ .collect(Collectors.toList());
+
+ final List result = new ArrayList<>(resourceTypes.size() * resources.size());
+ for (final ResourceType resourceType : resourceTypes) {
+ for (final String resource : resources) {
+ result.add(createResourcePattern(resourceType, resource));
+ }
+ }
+ return result;
+ }
+
+ private static ResourcePattern createResourcePattern(final ResourceType resourceType, final String resource) {
+ if (resource.equals(".*") || resource.equals("(.*)")) {
+ return new ResourcePattern(resourceType, "*", PatternType.LITERAL);
+ } else if (resource.endsWith("(.*)")) {
+ return new ResourcePattern(
+ resourceType, resource.substring(0, resource.length() - 4), PatternType.PREFIXED
+ );
+ } else {
+ return new ResourcePattern(resourceType, resource, PatternType.LITERAL);
+ }
+ }
+}
diff --git a/src/main/java/io/aiven/kafka/auth/nativeacls/ResourceTypeParser.java b/src/main/java/io/aiven/kafka/auth/nativeacls/ResourceTypeParser.java
new file mode 100644
index 0000000..46837d9
--- /dev/null
+++ b/src/main/java/io/aiven/kafka/auth/nativeacls/ResourceTypeParser.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import java.util.List;
+
+import org.apache.kafka.common.resource.ResourceType;
+
+import io.aiven.kafka.auth.nameformatters.ResourceTypeNameFormatter;
+
+class ResourceTypeParser {
+ // Visible for test
+ static List parse(final String resourceTypePattern) {
+ if (resourceTypePattern == null) {
+ return List.of();
+ }
+
+ if (resourceTypePattern.equals("^.*$") || resourceTypePattern.equals("^(.*)$")) {
+ return List.of(
+ ResourceType.TOPIC,
+ ResourceType.GROUP,
+ ResourceType.CLUSTER,
+ ResourceType.TRANSACTIONAL_ID,
+ ResourceType.DELEGATION_TOKEN
+ );
+ }
+
+ return List.of(ResourceTypeNameFormatter.format(resourceTypePattern));
+ }
+}
diff --git a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java
index cda6a7c..60ed6ef 100644
--- a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java
+++ b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java
@@ -45,6 +45,7 @@ void correctMinimalConfig() {
assertEquals("/test", config.getConfigFile().getAbsolutePath());
assertEquals(NoAuditor.class, config.getAuditor().getClass());
assertTrue(config.logDenials());
+ assertTrue(config.listAclsEnabled());
}
@Test
@@ -55,12 +56,14 @@ void correctFullConfig() {
userActivityProps.put("aiven.acl.authorizer.auditor.aggregation.period", "123");
userActivityProps.put("aiven.acl.authorizer.log.denials", "false");
userActivityProps.put("aiven.acl.authorizer.config.refresh.interval", "10");
+ userActivityProps.put("aiven.acl.authorizer.list.acls.enabled", "false");
var config = new AivenAclAuthorizerConfig(userActivityProps);
assertEquals("/test", config.getConfigFile().getAbsolutePath());
assertEquals(UserActivityAuditor.class, config.getAuditor().getClass());
assertFalse(config.logDenials());
assertEquals(10, config.configRefreshInterval());
+ assertFalse(config.listAclsEnabled());
final Map userActivityOpsProps = new HashMap<>();
userActivityOpsProps.put("aiven.acl.authorizer.configuration", "/test");
diff --git a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java
index 6fe36bb..bbb88ac 100644
--- a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java
+++ b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java
@@ -20,12 +20,18 @@
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -33,6 +39,8 @@
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.Action;
@@ -52,14 +60,14 @@
public class AivenAclAuthorizerV2Test {
static final ResourcePattern TOPIC_RESOURCE = new ResourcePattern(
- org.apache.kafka.common.resource.ResourceType.TOPIC,
- "Target",
- PatternType.LITERAL
+ org.apache.kafka.common.resource.ResourceType.TOPIC,
+ "Target",
+ PatternType.LITERAL
);
static final ResourcePattern GROUP_RESOURCE = new ResourcePattern(
- org.apache.kafka.common.resource.ResourceType.GROUP,
- "Target",
- PatternType.LITERAL
+ org.apache.kafka.common.resource.ResourceType.GROUP,
+ "Target",
+ PatternType.LITERAL
);
static final AclOperation READ_OPERATION = AclOperation.READ;
static final AclOperation CREATE_OPERATION = AclOperation.CREATE;
@@ -68,6 +76,9 @@ public class AivenAclAuthorizerV2Test {
+ "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}]";
static final String ACL_JSON_NOTYPE =
"[{\"principal\":\"^pass$\",\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}]";
+ static final String ACL_TOPIC_PREFIX_JSON =
+ "[{\"principal_type\":\"User\",\"principal\":\"^pass$\","
+ + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(prefix-(.*))$\"}]";
static final String ACL_JSON_LONG = "["
+ "{\"principal_type\":\"User\",\"principal\":\"^pass-0$\","
+ "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"},"
@@ -109,8 +120,8 @@ public class AivenAclAuthorizerV2Test {
void setUp() {
configFilePath = tmpDir.resolve("acl.json");
configs = Map.of(
- "aiven.acl.authorizer.configuration", configFilePath.toString(),
- "aiven.acl.authorizer.config.refresh.interval", "10");
+ "aiven.acl.authorizer.configuration", configFilePath.toString(),
+ "aiven.acl.authorizer.config.refresh.interval", "10");
auth.configure(configs);
}
@@ -119,6 +130,122 @@ void tearDown() {
auth.close();
}
+ @Test
+ public void testAclsMethodWhenListingEnabled() throws IOException {
+ Files.copy(this.getClass().getResourceAsStream("/test_acls_for_acls_method.json"), configFilePath);
+ startAuthorizer();
+
+ assertThat(auth.acls(AclBindingFilter.ANY))
+ .containsExactly(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.ALTER, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.ALTER_CONFIGS, AclPermissionType.ALLOW)
+ ),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DELETE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.READ, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.WRITE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry(
+ "User:test\\-user", "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW
+ )),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "prefix\\.", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.READ, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-admin", "*", AclOperation.CREATE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-admin", "*", AclOperation.DELETE, AclPermissionType.ALLOW))
+ );
+
+
+ assertThat(auth.acls(new AclBindingFilter(
+ new ResourcePatternFilter(ResourceType.TOPIC, "xxx", PatternType.MATCH),
+ new AccessControlEntryFilter("User:test\\-user", "*", AclOperation.ALTER_CONFIGS, AclPermissionType.ALLOW)
+ ))).containsExactly(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.ALTER_CONFIGS, AclPermissionType.ALLOW))
+ );
+
+ assertThat(auth.acls(new AclBindingFilter(
+ new ResourcePatternFilter(ResourceType.TOPIC, "prefix\\.example", PatternType.MATCH),
+ AccessControlEntryFilter.ANY
+ ))).containsExactly(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry(
+ "User:test\\-user", "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW
+ )),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "prefix\\.", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.READ, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-admin", "*", AclOperation.CREATE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-admin", "*", AclOperation.DELETE, AclPermissionType.ALLOW))
+ );
+
+ assertThat(auth.acls(new AclBindingFilter(
+ new ResourcePatternFilter(ResourceType.TOPIC, "xxx", PatternType.MATCH),
+ new AccessControlEntryFilter("User:test\\-user", "*", AclOperation.ANY, AclPermissionType.ALLOW)
+ ))).containsExactly(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.ALTER, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.ALTER_CONFIGS, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DELETE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.READ, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.WRITE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW))
+ );
+ }
+
+ @Test
+ public void testAclsMethodWhenListingDisabled() throws IOException {
+ final var configsUpdated = new HashMap<>(configs);
+ configsUpdated.put("aiven.acl.authorizer.list.acls.enabled", "false");
+ auth.configure(configsUpdated);
+
+ Files.copy(this.getClass().getResourceAsStream("/test_acls_for_acls_method.json"), configFilePath);
+ startAuthorizer();
+
+ assertThat(auth.acls(null))
+ .isEmpty();
+ }
+
@Test
public void testAivenAclAuthorizer() throws IOException, InterruptedException {
Files.write(configFilePath, ACL_JSON.getBytes());
@@ -151,6 +278,15 @@ public void testAivenAclAuthorizer() throws IOException, InterruptedException {
checkSingleAction(requestCtx("User", "pass"), action(READ_OPERATION, TOPIC_RESOURCE), true);
checkSingleAction(requestCtx("NonUser", "pass"), action(READ_OPERATION, TOPIC_RESOURCE), true);
+ Files.write(configFilePath, ACL_TOPIC_PREFIX_JSON.getBytes());
+ Thread.sleep(100);
+
+ checkSingleAction(requestCtx("User", "pass"), action(READ_OPERATION, new ResourcePattern(
+ org.apache.kafka.common.resource.ResourceType.TOPIC,
+ "prefix-topic",
+ PatternType.LITERAL
+ )), true);
+
Files.write(configFilePath, ACL_JSON_LONG.getBytes());
Thread.sleep(100);
diff --git a/src/test/java/io/aiven/kafka/auth/nativeacls/AclAivenToNativeConverterTest.java b/src/test/java/io/aiven/kafka/auth/nativeacls/AclAivenToNativeConverterTest.java
new file mode 100644
index 0000000..eafec9c
--- /dev/null
+++ b/src/test/java/io/aiven/kafka/auth/nativeacls/AclAivenToNativeConverterTest.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+
+import io.aiven.kafka.auth.json.AivenAcl;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AclAivenToNativeConverterTest {
+ @Test
+ public final void testConvertSimple() {
+ final var result = AclAivenToNativeConverter.convert(
+ new AivenAcl(
+ "User",
+ "^(test\\-user)$",
+ "^(Alter|AlterConfigs|Delete|Read|Write)$",
+ "^Topic:(xxx)$",
+ null
+ )
+ );
+ final ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL);
+ assertThat(result).containsExactly(
+ new AclBinding(
+ resourcePattern,
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.ALTER, AclPermissionType.ALLOW)),
+ new AclBinding(
+ resourcePattern,
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.ALTER_CONFIGS, AclPermissionType.ALLOW)),
+ new AclBinding(
+ resourcePattern,
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DELETE, AclPermissionType.ALLOW)),
+ new AclBinding(
+ resourcePattern,
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.READ, AclPermissionType.ALLOW)),
+ new AclBinding(
+ resourcePattern,
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
+ );
+ }
+
+ @Test
+ public final void testConvertPrefix() {
+ final var result = AclAivenToNativeConverter.convert(
+ new AivenAcl(
+ "User",
+ "^(test\\-user)$",
+ "^Read$",
+ "^Topic:(topic\\.(.*))$",
+ null
+ )
+ );
+ assertThat(result).containsExactly(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "topic\\.", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.READ, AclPermissionType.ALLOW)
+ )
+ );
+ }
+
+ @Test
+ public final void testConvertMultiplePrefixes() {
+ final var result = AclAivenToNativeConverter.convert(
+ new AivenAcl(
+ "User",
+ "^(test\\-user)$",
+ "^(Delete|Read|Write)$",
+ "^Topic:(topic\\.(.*)|prefix\\-(.*))$",
+ null
+ )
+ );
+ assertThat(result).containsExactly(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "topic\\.", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DELETE, AclPermissionType.ALLOW)
+ ),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "prefix\\-", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.DELETE, AclPermissionType.ALLOW)
+ ),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "topic\\.", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.READ, AclPermissionType.ALLOW)
+ ),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "prefix\\-", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.READ, AclPermissionType.ALLOW)
+ ),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "topic\\.", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.WRITE, AclPermissionType.ALLOW)
+ ),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "prefix\\-", PatternType.PREFIXED),
+ new AccessControlEntry("User:test\\-user", "*", AclOperation.WRITE, AclPermissionType.ALLOW)
+ )
+ );
+ }
+
+ @Test
+ public final void testSuperadmin() {
+ final var result = AclAivenToNativeConverter.convert(
+ new AivenAcl(
+ "User",
+ "^(admin)$",
+ "^(.*)$",
+ "^(.*)$",
+ null
+ )
+ );
+
+ final List expected = new ArrayList<>();
+ final List expectedResourceTypes = List.of(
+ ResourceType.TOPIC, ResourceType.GROUP, ResourceType.CLUSTER,
+ ResourceType.TRANSACTIONAL_ID, ResourceType.DELEGATION_TOKEN);
+ final List expectedAclOperations = List.of(
+ AclOperation.READ, AclOperation.WRITE, AclOperation.CREATE,
+ AclOperation.DELETE, AclOperation.ALTER, AclOperation.DESCRIBE,
+ AclOperation.CLUSTER_ACTION, AclOperation.DESCRIBE_CONFIGS,
+ AclOperation.ALTER_CONFIGS, AclOperation.IDEMPOTENT_WRITE);
+ for (final var resourceType : expectedResourceTypes) {
+ for (final var aclOperation : expectedAclOperations) {
+ expected.add(new AclBinding(
+ new ResourcePattern(resourceType, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:admin", "*", aclOperation, AclPermissionType.ALLOW))
+ );
+ }
+ }
+ assertThat(result).containsAll(expected);
+ }
+
+ @Test
+ public final void testAllUsers() {
+ final var result = AclAivenToNativeConverter.convert(
+ new AivenAcl(
+ "User",
+ "^(.*)$",
+ "^Read$",
+ "^Topic:(xxx)$",
+ null
+ )
+ );
+
+ assertThat(result).containsExactly(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new AccessControlEntry("User:*", "*", AclOperation.READ, AclPermissionType.ALLOW)
+ )
+ );
+ }
+
+ @Test
+ public final void testNoUserPrincipalType() {
+ final var result = AclAivenToNativeConverter.convert(
+ new AivenAcl(
+ "Group",
+ "^example$",
+ "^Read$",
+ "^Topic:(xxx)$",
+ null
+ )
+ );
+
+ assertThat(result).isEmpty();
+ }
+}
diff --git a/src/test/java/io/aiven/kafka/auth/nativeacls/AclOperationsParserTest.java b/src/test/java/io/aiven/kafka/auth/nativeacls/AclOperationsParserTest.java
new file mode 100644
index 0000000..a999702
--- /dev/null
+++ b/src/test/java/io/aiven/kafka/auth/nativeacls/AclOperationsParserTest.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import org.apache.kafka.common.acl.AclOperation;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AclOperationsParserTest {
+ @Test
+ public final void parseAclOperationsSingle() {
+ assertThat(AclOperationsParser.parse("^Create$"))
+ .containsExactly(AclOperation.CREATE);
+ }
+
+ @Test
+ public final void parseAclOperationsSingleWithParens() {
+ assertThat(AclOperationsParser.parse("^(Create)$"))
+ .containsExactly(AclOperation.CREATE);
+ }
+
+ @Test
+ public final void parseAclOperationsMultiple() {
+ // List all possible here, apart from Unknown.
+ final String operationPattern = "^(Any|All|Read|Write|Create|Delete|Alter|Describe|"
+ + "ClusterAction|DescribeConfigs|AlterConfigs|IdempotentWrite)$";
+ assertThat(AclOperationsParser.parse(operationPattern))
+ .containsExactly(
+ AclOperation.ANY,
+ AclOperation.ALL,
+ AclOperation.READ,
+ AclOperation.WRITE,
+ AclOperation.CREATE,
+ AclOperation.DELETE,
+ AclOperation.ALTER,
+ AclOperation.DESCRIBE,
+ AclOperation.CLUSTER_ACTION,
+ AclOperation.DESCRIBE_CONFIGS,
+ AclOperation.ALTER_CONFIGS,
+ AclOperation.IDEMPOTENT_WRITE
+ );
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"^(.*)$", "^.*$"})
+ public final void parseAclOperationsGlobalWildcard(final String value) {
+ assertThat(AclOperationsParser.parse(value))
+ .containsExactly(
+ AclOperation.READ,
+ AclOperation.WRITE,
+ AclOperation.CREATE,
+ AclOperation.DELETE,
+ AclOperation.ALTER,
+ AclOperation.DESCRIBE,
+ AclOperation.CLUSTER_ACTION,
+ AclOperation.DESCRIBE_CONFIGS,
+ AclOperation.ALTER_CONFIGS,
+ AclOperation.IDEMPOTENT_WRITE
+ );
+ }
+
+ @Test
+ public final void parseAclOperationsSingleUnknown() {
+ assertThat(AclOperationsParser.parse("^(Some)$"))
+ .isEmpty();
+ }
+
+ @Test
+ public final void parseAclOperationsNull() {
+ assertThat(AclOperationsParser.parse(null))
+ .isEmpty();
+ }
+
+ @Test
+ public final void parseAclOperationsMultipleWithUnknown() {
+ assertThat(AclOperationsParser.parse("^(Create|Describe|AlterConfigs|Some)$"))
+ .containsExactly(
+ AclOperation.CREATE,
+ AclOperation.DESCRIBE,
+ AclOperation.ALTER_CONFIGS);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "^(Create|Delete)",
+ "(Create|Delete)$",
+ "^(Create|Delete",
+ "Create|Delete)$"
+ })
+ public final void parseAclOperationsInvalid(final String pattern) {
+ assertThat(AclOperationsParser.parse(pattern))
+ .isEmpty();
+ }
+}
diff --git a/src/test/java/io/aiven/kafka/auth/nativeacls/AclPrincipalFormatterTest.java b/src/test/java/io/aiven/kafka/auth/nativeacls/AclPrincipalFormatterTest.java
new file mode 100644
index 0000000..d940fd2
--- /dev/null
+++ b/src/test/java/io/aiven/kafka/auth/nativeacls/AclPrincipalFormatterTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2023 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AclPrincipalFormatterTest {
+
+ @Test
+ public final void parseSinglePrincipal() {
+ assertThat(AclPrincipalFormatter.parse("User", "^username$"))
+ .containsExactly("User:username");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"(.*)", ".*", "^(username|(.*))$", "^(username|.*)$"})
+ public final void parseWildcardPrincipal(final String value) {
+ assertThat(AclPrincipalFormatter.parse("User", value))
+ .containsExactly("User:*");
+ }
+
+ @Test
+ public final void parseNotWildcard() {
+ assertThat(AclPrincipalFormatter.parse("User", "username.*"))
+ .containsExactly("User:username.*");
+ }
+
+ @Test
+ public final void parseMultipleUsers() {
+ assertThat(AclPrincipalFormatter.parse("User", "^(user1|user2)$"))
+ .containsExactly("User:user1", "User:user2");
+ }
+
+ @Test
+ public final void parseNullPrincipal() {
+ assertThat(AclPrincipalFormatter.parse("User", null))
+ .isEmpty();
+ }
+
+ @Test
+ public final void parseNullPrincipalType() {
+ assertThat(AclPrincipalFormatter.parse(null, "^username$"))
+ .isEmpty();
+ }
+
+}
diff --git a/src/test/java/io/aiven/kafka/auth/nativeacls/RegexParserTest.java b/src/test/java/io/aiven/kafka/auth/nativeacls/RegexParserTest.java
new file mode 100644
index 0000000..d33f523
--- /dev/null
+++ b/src/test/java/io/aiven/kafka/auth/nativeacls/RegexParserTest.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RegexParserTest {
+ @Test
+ public final void parseRegexListSingle() {
+ assertThat(RegexParser.parse("^AAA$"))
+ .containsExactly("AAA");
+ }
+
+ @Test
+ public final void parseRegexListSingleWithParens() {
+ assertThat(RegexParser.parse("^(AAA)$"))
+ .containsExactly("AAA");
+ }
+
+ @Test
+ public final void parseParenthesis() {
+ assertThat(RegexParser.parse("^qwe)$"))
+ .containsExactly("qwe)");
+ }
+
+ @Test
+ public final void parseNestedRegex() {
+ assertThat(RegexParser.parse("^(AAA|^(BB)$)$"))
+ .containsExactly("AAA", "^(BB)$");
+ }
+
+ @Test
+ public final void parseRegexListMultiple() {
+ assertThat(RegexParser.parse("^(AAA|BBB|CCC|DDD)$"))
+ .containsExactly("AAA", "BBB", "CCC", "DDD");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "^(AAA|BBB|CCC|DDD)",
+ "(AAA|BBB|CCC|DDD)$",
+ "^(AAA|BBB|CCC|DDD",
+ "AAA|BBB|CCC|DDD)$"
+ })
+ public final void parseRegexListInvalid(final String pattern) {
+ assertThat(RegexParser.parse(pattern))
+ .isNull();
+ }
+}
diff --git a/src/test/java/io/aiven/kafka/auth/nativeacls/ResourcePatternParserTest.java b/src/test/java/io/aiven/kafka/auth/nativeacls/ResourcePatternParserTest.java
new file mode 100644
index 0000000..061955b
--- /dev/null
+++ b/src/test/java/io/aiven/kafka/auth/nativeacls/ResourcePatternParserTest.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ResourcePatternParserTest {
+ @ParameterizedTest
+ @ValueSource(strings = {"^(.*)$", "^.*$"})
+ public final void parseResourcePatternGlobalWildcard(final String value) {
+ assertThat(ResourcePatternParser.parse(value))
+ .containsExactly(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.CLUSTER, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.DELEGATION_TOKEN, "*", PatternType.LITERAL)
+ );
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"^Cluster:(.*)$", "^Cluster:.*$"})
+ public final void parseResourcePatternSingleResourceTypeWildcard(final String value) {
+ assertThat(ResourcePatternParser.parse(value))
+ .containsExactly(new ResourcePattern(ResourceType.CLUSTER, "*", PatternType.LITERAL));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"^TransactionalId:(xxx)$", "^TransactionalId:xxx$"})
+ public final void parseResourcePatternSingleResourceTypeSingleResource(final String value) {
+ assertThat(ResourcePatternParser.parse(value))
+ .containsExactly(new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "xxx", PatternType.LITERAL));
+ }
+
+ @Test
+ public final void parseResourcePatternSingleResourceTypeMultipleResource() {
+ assertThat(ResourcePatternParser.parse("^Topic:(xxx|yyy|bac.*xyz)$"))
+ .containsExactly(
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "yyy", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "bac.*xyz", PatternType.LITERAL)
+ );
+ }
+
+ @Test
+ public final void parseResourcePatternMultipleResourceTypeWildcard() {
+ assertThat(ResourcePatternParser.parse("^(Cluster|Topic):(.*)$"))
+ .containsExactly(
+ new ResourcePattern(ResourceType.CLUSTER, "*", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL)
+ );
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"^(Cluster|Topic):(xxx)$", "^(Cluster|Topic):xxx$"})
+ public final void parseResourcePatternMultipleResourceTypeSingleResource(final String value) {
+ assertThat(ResourcePatternParser.parse(value))
+ .containsExactly(
+ new ResourcePattern(ResourceType.CLUSTER, "xxx", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL)
+ );
+ }
+
+ @Test
+ public final void parseResourcePatternMultipleResourceTypeMultipleResource() {
+ assertThat(ResourcePatternParser.parse("^(Cluster|Topic):(xxx|yyy|bac.*xyz)$"))
+ .containsExactly(
+ new ResourcePattern(ResourceType.CLUSTER, "xxx", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.CLUSTER, "yyy", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.CLUSTER, "bac.*xyz", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "xxx", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "yyy", PatternType.LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "bac.*xyz", PatternType.LITERAL)
+ );
+ }
+
+ @Test
+ public final void parsePrefixResource() {
+ assertThat(ResourcePatternParser.parse("^Topic:(topic.(.*))$"))
+ .containsExactly(
+ new ResourcePattern(ResourceType.TOPIC, "topic.", PatternType.PREFIXED)
+ );
+ }
+
+ @Test
+ public final void parseMultiplePrefixResource() {
+ assertThat(ResourcePatternParser.parse("^Topic:(topic1.(.*)|topic2.(.*))$"))
+ .containsExactly(
+ new ResourcePattern(ResourceType.TOPIC, "topic1.", PatternType.PREFIXED),
+ new ResourcePattern(ResourceType.TOPIC, "topic2.", PatternType.PREFIXED)
+ );
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "^(AAA|BBB|CCC|DDD)",
+ "(AAA|BBB|CCC|DDD)$",
+ "^(AAA|BBB|CCC|DDD",
+ "AAA|BBB|CCC|DDD)$",
+ "^Cluster$",
+ "^Cluster:$",
+ "^:xxx$",
+ })
+ public final void parseResourcePatternInvalid(final String pattern) {
+ assertThat(ResourcePatternParser.parse(pattern))
+ .isEmpty();
+ }
+
+ @Test
+ public final void parseResourcePatternNull() {
+ assertThat(ResourcePatternParser.parse(null))
+ .isEmpty();
+ }
+}
diff --git a/src/test/java/io/aiven/kafka/auth/nativeacls/ResourceTypeParserTest.java b/src/test/java/io/aiven/kafka/auth/nativeacls/ResourceTypeParserTest.java
new file mode 100644
index 0000000..89fa07c
--- /dev/null
+++ b/src/test/java/io/aiven/kafka/auth/nativeacls/ResourceTypeParserTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2022 Aiven Oy https://aiven.io
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.auth.nativeacls;
+
+import org.apache.kafka.common.resource.ResourceType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ResourceTypeParserTest {
+ @Test
+ public final void parseResourceTypeSingle() {
+ assertThat(ResourceTypeParser.parse("Topic"))
+ .containsExactly(ResourceType.TOPIC);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"^(.*)$", "^.*$"})
+ public final void parseResourceTypeGlobalWildcard(final String value) {
+ assertThat(ResourceTypeParser.parse(value))
+ .containsExactly(
+ ResourceType.TOPIC,
+ ResourceType.GROUP,
+ ResourceType.CLUSTER,
+ ResourceType.TRANSACTIONAL_ID,
+ ResourceType.DELEGATION_TOKEN
+ );
+ }
+
+ @Test
+ public final void parseResourceTypeUnknown() {
+ assertThat(ResourceTypeParser.parse("Some"))
+ .containsExactly(ResourceType.UNKNOWN);
+ }
+
+ @Test
+ public final void parseResourceTypeNull() {
+ assertThat(ResourceTypeParser.parse(null))
+ .isEmpty();
+ }
+
+}
diff --git a/src/test/resources/test_acls_for_acls_method.json b/src/test/resources/test_acls_for_acls_method.json
new file mode 100644
index 0000000..0381bce
--- /dev/null
+++ b/src/test/resources/test_acls_for_acls_method.json
@@ -0,0 +1,38 @@
+[
+ {
+ "operation": "^(Alter|AlterConfigs|Delete|Read|Write)$",
+ "principal": "^(test\\-user)$",
+ "principal_type": "User",
+ "resource": "^Topic:(xxx)$"
+ },
+ {
+ "operation": "^(Describe|DescribeConfigs)$",
+ "principal": "^(test\\-user)$",
+ "principal_type": "User",
+ "resource": "^Topic:(.*)$"
+ },
+ {
+ "operation": "^Read$",
+ "principal": "^(test\\-user)$",
+ "principal_type": "User",
+ "resource": "^Topic:(prefix\\.(.*))$"
+ },
+ {
+ "operation": "^Create$",
+ "principal": "^(test\\-admin)$",
+ "principal_type": "User",
+ "resource": "^Topic:(.*)$"
+ },
+ {
+ "operation": "^Delete$",
+ "principal": "^(test\\-admin)$",
+ "principal_type": "User",
+ "resource": "^Topic:(.*)$"
+ },
+ {
+ "operation": "^(.*)$",
+ "principal": "^(.*)$",
+ "principal_type": "Service",
+ "resource": "^(.*)$"
+ }
+]