Skip to content

Commit

Permalink
Switch AivenAclAuthorizerV2 to new API
Browse files Browse the repository at this point in the history
Some notable changes:
1. New API classes require some conversions back and forth. Some conversion logic is taken from `AuthorizerWrapper`.
2. `configure` and `start` methods. The split of the old `configure` is trivial.
3. Some actions are non-loggable (they are just checks, see `Action`'s Javadoc), hence new parameters in `logAuthVerdict`.
  • Loading branch information
ivanyu committed Sep 16, 2021
1 parent 957c172 commit 7f12e6a
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 90 deletions.
1 change: 1 addition & 0 deletions checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
<suppress checks="ClassFanOutComplexity" files="AivenAclAuthorizerV2.java" />
<suppress checks="NPathComplexity" files="AivenAclAuthorizer.java" />
<suppress checks="MethodLength" files="AivenAclAuthorizerTest.java"/>
<suppress checks="ClassFanOutComplexity" files="AivenAclAuthorizerV2Test.java"/>
</suppressions>
140 changes: 85 additions & 55 deletions src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,43 @@
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;

import io.aiven.kafka.auth.audit.AuditorAPI;
import io.aiven.kafka.auth.json.AivenAcl;
import io.aiven.kafka.auth.json.reader.AclJsonReader;
import io.aiven.kafka.auth.json.reader.JsonReaderException;

import kafka.network.RequestChannel.Session;
import kafka.security.auth.Acl;
import kafka.security.auth.Authorizer;
import kafka.security.auth.Operation;
import kafka.security.auth.Resource;
import kafka.security.auth.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("deprecation")
public class AivenAclAuthorizerV2 implements Authorizer {

private static final Logger LOGGER = LoggerFactory.getLogger(AivenAclAuthorizerV2.class);
Expand All @@ -58,6 +71,8 @@ public class AivenAclAuthorizerV2 implements Authorizer {
private final WatchService watchService;
private final AtomicReference<VerdictCache> cacheReference = new AtomicReference<>();

private AivenAclAuthorizerConfig config;

public AivenAclAuthorizerV2() {
try {
watchService = FileSystems.getDefault().newWatchService();
Expand All @@ -69,8 +84,12 @@ public AivenAclAuthorizerV2() {

@Override
public void configure(final java.util.Map<String, ?> configs) {
final AivenAclAuthorizerConfig config = new AivenAclAuthorizerConfig(configs);
config = new AivenAclAuthorizerConfig(configs);
}

@Override
public final Map<Endpoint, ? extends CompletionStage<Void>> start(
final AuthorizerServerInfo serverInfo) {
auditor = config.getAuditor();
logDenials = config.logDenials();

Expand Down Expand Up @@ -99,6 +118,13 @@ public void configure(final java.util.Map<String, ?> configs) {
watchKeyReference.set(subscribeToAclChanges(configFile));
}
}, 0, config.configRefreshInterval(), TimeUnit.MILLISECONDS);

// These futures are just placeholders.
return serverInfo.endpoints().stream()
.collect(Collectors.toMap(
endpoint -> endpoint,
endpoint -> CompletableFuture.completedFuture(null)
));
}

private WatchKey subscribeToAclChanges(final File configFile) {
Expand All @@ -124,24 +150,40 @@ public void close() {
}

@Override
public boolean authorize(final Session session,
final Operation operation,
final Resource resource) {
public final List<AuthorizationResult> authorize(final AuthorizableRequestContext requestContext,
final List<Action> actions) {
final KafkaPrincipal principal =
Objects.requireNonNullElse(session.principal(), KafkaPrincipal.ANONYMOUS);
final String resourceToCheck =
resource.resourceType() + ":" + resource.name();
final boolean verdict =
checkAcl(
principal.getPrincipalType(),
principal.getName(),
operation.name(),
resourceToCheck
);
auditor.addActivity(session, operation, resource, verdict);
return verdict;
}
Objects.requireNonNullElse(requestContext.principal(), KafkaPrincipal.ANONYMOUS);
final List<AuthorizationResult> result = new ArrayList<>(actions.size());
for (final Action action : actions) {
// Some string conversions are done inside.
final var operation = Operation.fromJava(action.operation());
final var resourceType = ResourceType.fromJava(action.resourcePattern().resourceType());
final String resourceToCheck =
resourceType + ":" + action.resourcePattern().name();
final boolean verdict =
checkAcl(
principal.getPrincipalType(),
principal.getName(),
operation.name(),
resourceToCheck,
action.logIfAllowed(),
action.logIfDenied()
);

// When we finally drop the old API, we can change the auditor API,
// so it doesn't require these conversions.
final var session = new Session(principal, requestContext.clientAddress());
final var resource = new Resource(
resourceType,
action.resourcePattern().name(),
action.resourcePattern().patternType());
auditor.addActivity(session, operation, resource, verdict);

result.add(verdict ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED);
}
return result;
}

/**
* Read ACL entries from config file.
Expand All @@ -162,22 +204,26 @@ private List<AivenAcl> loadAcls(final AclJsonReader jsonReader) {
private boolean checkAcl(final String principalType,
final String principalName,
final String operation,
final String resource) {

final String resource,
final boolean actionLogIfAllowed,
final boolean actionLogIfDenied) {
final boolean verdict = cacheReference.get().get(principalType, principalName, operation, resource);
logAuthVerdict(verdict, operation, resource, principalType, principalName);
logAuthVerdict(verdict, operation, resource, principalType, principalName,
actionLogIfAllowed, actionLogIfDenied);
return verdict;
}

private void logAuthVerdict(final boolean verdict,
final String operation,
final String resource,
final String principalType,
final String principalName) {
if (verdict) {
final String principalName,
final boolean actionLogIfAllowed,
final boolean actionLogIfDenied) {
if (verdict && actionLogIfAllowed) {
LOGGER.debug("[ALLOW] Auth request {} on {} by {} {}",
operation, resource, principalType, principalName);
} else {
} else if (actionLogIfDenied) {
if (logDenials) {
LOGGER.info("[DENY] Auth request {} on {} by {} {}",
operation, resource, principalType, principalName);
Expand All @@ -189,40 +235,24 @@ private void logAuthVerdict(final boolean verdict,
}

@Override
public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) {
LOGGER.error("getAcls(Resource) is not implemented");
return new scala.collection.immutable.HashSet<>();
}

@Override
public scala.collection.immutable.Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(
final KafkaPrincipal principal) {
LOGGER.error("getAcls(KafkaPrincipal) is not implemented");
return new scala.collection.immutable.HashMap<>();
}

@Override
public scala.collection.immutable.Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() {
LOGGER.error("getAcls() is not implemented");
return new scala.collection.immutable.HashMap<>();
}

@Override
public boolean removeAcls(final scala.collection.immutable.Set<Acl> acls,
final Resource resource) {
LOGGER.error("removeAcls(Set<Acl>, Resource) is not implemented");
return false;
public final List<? extends CompletionStage<AclCreateResult>> createAcls(
final AuthorizableRequestContext requestContext,
final List<AclBinding> aclBindings) {
LOGGER.error("`createAcls` is not implemented");
return List.of();
}

@Override
public boolean removeAcls(final Resource resource) {
LOGGER.error("removeAcls(Resource) is not implemented");
return false;
public final List<? extends CompletionStage<AclDeleteResult>> deleteAcls(
final AuthorizableRequestContext requestContext,
final List<AclBindingFilter> aclBindingFilters) {
LOGGER.error("`deleteAcls` is not implemented");
return List.of();
}

@Override
public void addAcls(final scala.collection.immutable.Set<Acl> acls,
final Resource resource) {
LOGGER.error("addAcls(Set<Acl>, Resource) is not implemented");
public final Iterable<AclBinding> acls(final AclBindingFilter filter) {
LOGGER.error("`acls` is not implemented");
return List.of();
}
}
Loading

0 comments on commit 7f12e6a

Please sign in to comment.