Skip to content

Commit

Permalink
Merge pull request #34 from aiven/anatolii/acl_authorizer_fix
Browse files Browse the repository at this point in the history
Aiven ACL authorizer refactoring
  • Loading branch information
ivanyu authored Aug 10, 2021
2 parents 983971f + 54fd782 commit 354b65f
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
java-version: 11

- name: Build with Gradle
run: ./gradlew build
run: ./gradlew build -i

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
2 changes: 1 addition & 1 deletion .github/workflows/master_push_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ jobs:
with:
java-version: 11
- name: Build with Gradle
run: ./gradlew build
run: ./gradlew build -i
2 changes: 1 addition & 1 deletion .github/workflows/pull_request_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
with:
java-version: 11
- name: Build with Gradle
run: ./gradlew build
run: ./gradlew build -i
- name: Publish Test Report
uses: mikepenz/action-junit-report@v2
with:
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dependencies {

testImplementation "org.openjdk.jmh:jmh-core:1.28"
testImplementation "org.openjdk.jmh:jmh-generator-annprocess:1.28"
testImplementation "org.assertj:assertj-core:3.20.2"

jmh "org.apache.kafka:kafka_2.12:2.1.1"
}
Expand Down
1 change: 1 addition & 0 deletions checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<!-- FIXME will remove after cleaning the code -->
<suppress checks="CyclomaticComplexity" files="AivenAclAuthorizer.java" />
<suppress checks="ClassFanOutComplexity" files="AivenAclAuthorizer.java" />
<suppress checks="ClassFanOutComplexity" files="AivenAclAuthorizerV2.java" />
<suppress checks="NPathComplexity" files="AivenAclAuthorizer.java" />
<suppress checks="MethodLength" files="AivenAclAuthorizerTest.java"/>
</suppressions>
17 changes: 14 additions & 3 deletions src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class AivenAclAuthorizerConfig extends AbstractConfig {
private static final String CONFIGURATION_CONF = "aiven.acl.authorizer.configuration";
private static final String AUDITOR_CLASS_NAME_CONF = "aiven.acl.authorizer.auditor.class.name";
private static final String LOG_DENIALS_CONF = "aiven.acl.authorizer.log.denials";
private static final String CONFIG_REFRESH_CONF = "aiven.acl.authorizer.config.refresh.interval";

public AivenAclAuthorizerConfig(final Map<?, ?> originals) {
super(configDef(), originals);
Expand All @@ -54,18 +55,28 @@ public static ConfigDef configDef() {
true,
ConfigDef.Importance.LOW,
"Whether to log denials on INFO level"
).define(
CONFIG_REFRESH_CONF,
ConfigDef.Type.INT,
10_000,
ConfigDef.Importance.LOW,
"The interval between ACL reloads"
);
}

public final File getConfigFile() {
public File getConfigFile() {
return new File(getString(CONFIGURATION_CONF));
}

public final AuditorAPI getAuditor() {
public AuditorAPI getAuditor() {
return getConfiguredInstance(AUDITOR_CLASS_NAME_CONF, AuditorAPI.class);
}

public final boolean logDenials() {
public boolean logDenials() {
return getBoolean(LOG_DENIALS_CONF);
}

public int configRefreshInterval() {
return getInt(CONFIG_REFRESH_CONF);
}
}
227 changes: 227 additions & 0 deletions src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright 2019 Aiven Oy https://aiven.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.auth;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.common.security.auth.KafkaPrincipal;

import io.aiven.kafka.auth.audit.AuditorAPI;
import io.aiven.kafka.auth.json.AivenAcl;
import io.aiven.kafka.auth.json.reader.AclJsonReader;
import io.aiven.kafka.auth.json.reader.JsonReaderException;

import kafka.network.RequestChannel.Session;
import kafka.security.auth.Acl;
import kafka.security.auth.Authorizer;
import kafka.security.auth.Operation;
import kafka.security.auth.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AivenAclAuthorizerV2 implements Authorizer {

private static final Logger LOGGER = LoggerFactory.getLogger(AivenAclAuthorizerV2.class);
private File configFile;
private AuditorAPI auditor;
private boolean logDenials;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final WatchService watchService;
private final AtomicReference<VerdictCache> cacheReference = new AtomicReference<>();

public AivenAclAuthorizerV2() {
try {
watchService = FileSystems.getDefault().newWatchService();
} catch (final IOException e) {
LOGGER.error("Failed to initialize WatchService", e);
throw new RuntimeException(e);
}
}

@Override
public void configure(final java.util.Map<String, ?> configs) {
final AivenAclAuthorizerConfig config = new AivenAclAuthorizerConfig(configs);

auditor = config.getAuditor();
logDenials = config.logDenials();

configFile = config.getConfigFile();
final AclJsonReader jsonReader = new AclJsonReader(configFile.toPath());
cacheReference.set(VerdictCache.create(loadAcls(jsonReader)));
final AtomicReference<WatchKey> watchKeyReference = new AtomicReference<>(subscribeToAclChanges(configFile));
scheduledExecutorService.scheduleWithFixedDelay(() -> {
final WatchKey watchKey = watchKeyReference.get();
if (watchKey != null) {
final List<WatchEvent<?>> watchEvents = watchKey.pollEvents();

watchEvents.stream().filter(watchEvent -> {
@SuppressWarnings("unchecked")
final Path path = ((WatchEvent<Path>) watchEvent).context();
return configFile.toPath().getFileName().equals(path);
}).findFirst().ifPresent(watchEvent -> {
LOGGER.info("{}: {}, Modified: {}",
watchEvent.kind(), watchEvent.context(), configFile.lastModified());
cacheReference.set(VerdictCache.create(loadAcls(jsonReader)));
});
if (!watchKey.reset()) {
watchKeyReference.compareAndSet(watchKey, subscribeToAclChanges(configFile));
}
} else {
watchKeyReference.set(subscribeToAclChanges(configFile));
}
}, 0, config.configRefreshInterval(), TimeUnit.MILLISECONDS);
}

private WatchKey subscribeToAclChanges(final File configFile) {
try {
return configFile.toPath().toAbsolutePath().getParent()
.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE);
} catch (final IOException e) {
LOGGER.error("Failed to subscribe to ACL configuration changes", e);
return null;
}
}

@Override
public void close() {
auditor.stop();
scheduledExecutorService.shutdownNow();
try {
watchService.close();
} catch (final IOException e) {
LOGGER.error("Failed to stop watch service", e);
}
}

@Override
public boolean authorize(final Session session,
final Operation operation,
final Resource resource) {
final KafkaPrincipal principal =
Objects.requireNonNullElse(session.principal(), KafkaPrincipal.ANONYMOUS);
final String resourceToCheck =
resource.resourceType() + ":" + resource.name();
final boolean verdict =
checkAcl(
principal.getPrincipalType(),
principal.getName(),
operation.name(),
resourceToCheck
);
auditor.addActivity(session, operation, resource, verdict);
return verdict;
}


/**
* Read ACL entries from config file.
*/
private List<AivenAcl> loadAcls(final AclJsonReader jsonReader) {
LOGGER.info("Reloading ACL configuration...");
try {
return jsonReader.read();
} catch (final JsonReaderException ex) {
LOGGER.error("Failed to load ACL config file", ex);
return Collections.emptyList();
}
}

/**
* Authorize a single request.
*/
private boolean checkAcl(final String principalType,
final String principalName,
final String operation,
final String resource) {

final boolean verdict = cacheReference.get().get(principalType, principalName, operation, resource);
logAuthVerdict(verdict, operation, resource, principalType, principalName);
return verdict;
}

private void logAuthVerdict(final boolean verdict,
final String operation,
final String resource,
final String principalType,
final String principalName) {
if (verdict) {
LOGGER.debug("[ALLOW] Auth request {} on {} by {} {}",
operation, resource, principalType, principalName);
} else {
if (logDenials) {
LOGGER.info("[DENY] Auth request {} on {} by {} {}",
operation, resource, principalType, principalName);
} else {
LOGGER.debug("[DENY] Auth request {} on {} by {} {}",
operation, resource, principalType, principalName);
}
}
}

@Override
public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) {
LOGGER.error("getAcls(Resource) is not implemented");
return new scala.collection.immutable.HashSet<>();
}

@Override
public scala.collection.immutable.Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(
final KafkaPrincipal principal) {
LOGGER.error("getAcls(KafkaPrincipal) is not implemented");
return new scala.collection.immutable.HashMap<>();
}

@Override
public scala.collection.immutable.Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() {
LOGGER.error("getAcls() is not implemented");
return new scala.collection.immutable.HashMap<>();
}

@Override
public boolean removeAcls(final scala.collection.immutable.Set<Acl> acls,
final Resource resource) {
LOGGER.error("removeAcls(Set<Acl>, Resource) is not implemented");
return false;
}

@Override
public boolean removeAcls(final Resource resource) {
LOGGER.error("removeAcls(Resource) is not implemented");
return false;
}

@Override
public void addAcls(final scala.collection.immutable.Set<Acl> acls,
final Resource resource) {
LOGGER.error("addAcls(Set<Acl>, Resource) is not implemented");
}
}
49 changes: 49 additions & 0 deletions src/main/java/io/aiven/kafka/auth/VerdictCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 Aiven Oy https://aiven.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.auth;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.aiven.kafka.auth.json.AivenAcl;

public class VerdictCache {
private final List<AivenAcl> aclEntries;
private final Map<String, Boolean> cache = new ConcurrentHashMap<>();

private VerdictCache(final List<AivenAcl> aclEntries) {
this.aclEntries = aclEntries;
}

public boolean get(final String principalType,
final String principalName,
final String operation,
final String resource) {
if (aclEntries != null) {
final String cacheKey = resource + "|" + operation + "|" + principalName + "|" + principalType;
return cache.computeIfAbsent(cacheKey, key -> aclEntries.stream()
.anyMatch(aclEntry -> aclEntry.check(principalType, principalName, operation, resource)));
} else {
return false;
}
}

public static VerdictCache create(final List<AivenAcl> aclEntries) {
return new VerdictCache(aclEntries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,27 @@ void correctFullConfig() {
userActivityProps.put("aiven.acl.authorizer.auditor.class.name", UserActivityAuditor.class.getName());
userActivityProps.put("aiven.acl.authorizer.auditor.aggregation.period", "123");
userActivityProps.put("aiven.acl.authorizer.log.denials", "false");
userActivityProps.put("aiven.acl.authorizer.config.refresh.interval", "10");

var config = new AivenAclAuthorizerConfig(userActivityProps);
assertEquals("/test", config.getConfigFile().getAbsolutePath());
assertEquals(UserActivityAuditor.class, config.getAuditor().getClass());
assertFalse(config.logDenials());
assertEquals(10, config.configRefreshInterval());

final Map<String, String> userActivityOpsProps = new HashMap<>();
userActivityOpsProps.put("aiven.acl.authorizer.configuration", "/test");
userActivityOpsProps.put("aiven.acl.authorizer.auditor.class.name",
UserOperationsActivityAuditor.class.getName());
userActivityOpsProps.put("aiven.acl.authorizer.auditor.aggregation.period", "123");
userActivityOpsProps.put("aiven.acl.authorizer.log.denials", "false");
userActivityOpsProps.put("aiven.acl.authorizer.config.refresh.interval", "10");

config = new AivenAclAuthorizerConfig(userActivityOpsProps);
assertEquals("/test", config.getConfigFile().getAbsolutePath());
assertEquals(UserOperationsActivityAuditor.class, config.getAuditor().getClass());
assertFalse(config.logDenials());
assertEquals(10, config.configRefreshInterval());
}

@Test
Expand Down
Loading

0 comments on commit 354b65f

Please sign in to comment.