Skip to content

Commit

Permalink
Merge pull request #8 from aiven/auth-fix
Browse files Browse the repository at this point in the history
Fix problem with authorization
  • Loading branch information
ivanyu authored Apr 15, 2020
2 parents 592200b + 8c9ce72 commit b2380f1
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 29 deletions.
1 change: 1 addition & 0 deletions checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
<suppress checks="CyclomaticComplexity" files="AivenAclAuthorizer.java" />
<suppress checks="ClassFanOutComplexity" files="AivenAclAuthorizer.java" />
<suppress checks="NPathComplexity" files="AivenAclAuthorizer.java" />
<suppress checks="MethodLength" files="AivenAclAuthorizerTest.java"/>
</suppressions>
39 changes: 21 additions & 18 deletions src/main/java/io/aiven/kafka/auth/AivenAclAuthorizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -103,31 +104,33 @@ public void close() {

@Override
public boolean authorize(final Session session,
final Operation operationObj,
final Resource resourceObj) {
KafkaPrincipal principal = session.principal();
if (principal == null) {
principal = KafkaPrincipal.ANONYMOUS;
}

final String principalName = principal.getName();
final String principalType = principal.getPrincipalType();
final String operation = operationObj.name();
final String resource = resourceObj.resourceType() + ":" + resourceObj.name();

final boolean verdict = checkAcl(principalName, principalType, operation, resource);
auditor.addActivity(session, operationObj, resourceObj, verdict);
final Operation operation,
final Resource resource) {
final KafkaPrincipal principal =
Objects.nonNull(session.principal())
? session.principal()
: KafkaPrincipal.ANONYMOUS;
final String resourceToCheck =
resource.resourceType() + ":" + resource.name();
final boolean verdict =
checkAcl(
principal.getPrincipalType(),
principal.getName(),
operation.name(),
resourceToCheck
);
auditor.addActivity(session, operation, resource, verdict);
return verdict;
}

/**
* Authorize a single request.
*/
//FIXME split code here in functions !!!!!
public boolean checkAcl(final String principalType,
final String principalName,
final String operation,
final String resource) {
private boolean checkAcl(final String principalType,
final String principalName,
final String operation,
final String resource) {
final long now = System.nanoTime() / 1000000; // nanoTime is monotonic, convert to milliseconds
boolean verdict = false;
String cacheKey = null;
Expand Down
172 changes: 161 additions & 11 deletions src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import io.aiven.kafka.auth.AivenAclAuthorizer;

import kafka.network.RequestChannel.Session;
import kafka.security.auth.Operation;
import kafka.security.auth.Resource;
import kafka.security.auth.ResourceType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -65,11 +74,76 @@ public void testAivenAclAuthorizer() throws IOException {
auth.configure(configs);

// basic ACL checks
assertTrue(auth.checkAcl("User", "pass", "Read", "Topic:Target"));
assertFalse(auth.checkAcl("User", "fail", "Read", "Topic:Target"));
assertFalse(auth.checkAcl("User", "pass", "Read", "Fail:Target"));
assertFalse(auth.checkAcl("User", "pass", "FailRead", "Topic:Target"));
assertFalse(auth.checkAcl("NonUser", "pass", "Read", "Topic:Target"));
assertTrue(
auth.authorize(
new Session(
new KafkaPrincipal("User", "pass"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);
assertFalse(
auth.authorize(
new Session(
new KafkaPrincipal("User", "fail"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);
assertFalse(
auth.authorize(
new Session(
new KafkaPrincipal("User", "pass"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.GROUP),
"Target",
PatternType.LITERAL
)
)
);
assertFalse(
auth.authorize(
new Session(
new KafkaPrincipal("User", "pass"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.CREATE),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);
assertFalse(
auth.authorize(
new Session(
new KafkaPrincipal("NonUser", "pass"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);

// reload logic
assertFalse(auth.reloadAcls());
Expand All @@ -82,20 +156,96 @@ public void testAivenAclAuthorizer() throws IOException {
aclJson.setLastModified(aclJson.lastModified() + 20000);
assertTrue(auth.reloadAcls());

assertTrue(auth.checkAcl("User", "pass", "Read", "Topic:Target"));
assertTrue(auth.checkAcl("NonUser", "pass", "Read", "Topic:Target"));
assertTrue(auth.authorize(
new Session(
new KafkaPrincipal("User", "pass"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
));
assertTrue(
auth.authorize(
new Session(
new KafkaPrincipal("NonUser", "pass"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);

// Longer configs trigger caching of results
Files.write(configFilePath, ACL_JSON_LONG.getBytes());
aclJson.setLastModified(aclJson.lastModified() + 30000);
assertTrue(auth.reloadAcls());

// first iteration without cache
assertTrue(auth.checkAcl("User", "pass-1", "Read", "Topic:Target"));
assertFalse(auth.checkAcl("User", "fail-1", "Read", "Topic:Target"));
assertTrue(
auth.authorize(
new Session(
new KafkaPrincipal("User", "pass-1"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);
assertFalse(
auth.authorize(
new Session(
new KafkaPrincipal("User", "fail-1"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);

// second iteration from cache
assertTrue(auth.checkAcl("User", "pass-1", "Read", "Topic:Target"));
assertFalse(auth.checkAcl("User", "fail-1", "Read", "Topic:Target"));
assertTrue(
auth.authorize(
new Session(
new KafkaPrincipal("User", "pass-1"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);
assertFalse(
auth.authorize(
new Session(
new KafkaPrincipal("User", "fail-1"),
InetAddress.getLocalHost()
),
Operation.fromJava(AclOperation.READ),
new Resource(
ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC),
"Target",
PatternType.LITERAL
)
)
);
}
}
7 changes: 7 additions & 0 deletions src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@ log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n


log4j.appender.audit=org.apache.log4j.ConsoleAppender
log4j.appender.audit.layout=org.apache.log4j.PatternLayout
log4j.appender.audit.layout.ConversionPattern=[%d] AUDIT: %m%n

log4j.logger.aiven.auditor.logger=INFO, audit

0 comments on commit b2380f1

Please sign in to comment.