diff --git a/src/main/java/io/aiven/kafka/auth/audit/Auditor.java b/src/main/java/io/aiven/kafka/auth/audit/Auditor.java index 1ed7819..b04ba22 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/Auditor.java +++ b/src/main/java/io/aiven/kafka/auth/audit/Auditor.java @@ -34,9 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.PRINCIPAL; -import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.PRINCIPAL_AND_SOURCE_IP; - public abstract class Auditor implements AuditorAPI { private final Logger logger; @@ -45,10 +42,9 @@ public abstract class Auditor implements AuditorAPI { private final Lock auditLock = new ReentrantLock(); - private final ScheduledExecutorService auditScheduler = - Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService auditScheduler = Executors.newScheduledThreadPool(1); - private AuditorDumpFormatter formatter; + protected AuditorConfig auditorConfig; public Auditor() { this(LoggerFactory.getLogger("aiven.auditor.logger")); @@ -61,53 +57,39 @@ protected Auditor(final Logger logger) { @Override public void configure(final Map configs) { - final AuditorConfig auditorConfig = new AuditorConfig(configs); + auditorConfig = new AuditorConfig(configs); auditScheduler.scheduleAtFixedRate( this::dump, auditorConfig.getAggregationPeriodInSeconds(), auditorConfig.getAggregationPeriodInSeconds(), TimeUnit.SECONDS ); - - formatter = createFormatter(auditorConfig); - } - - private AuditorDumpFormatter createFormatter(final AuditorConfig auditorConfig) { - final String grouping = auditorConfig.getAggregationGrouping(); - if (PRINCIPAL_AND_SOURCE_IP.getConfigValue().equals(grouping)) { - return new PrincipalAndIpFormatter(); - } else if (PRINCIPAL.getConfigValue().equals(grouping)) { - return new PrincipalFormatter(); - } else { - throw new RuntimeException("Not implemented"); - } } @Override - public void addActivity(final Session session, - final Operation operation, - final Resource resource, - final boolean hasAccess) { - - final AuditKey auditKey = AuditKey.fromSession(session); + public final void addActivity(final Session session, + final Operation operation, + final Resource resource, + final boolean hasAccess) { auditLock.lock(); try { - auditStorage.compute(auditKey, (key, userActivity) -> - onUserActivity(Objects.isNull(userActivity) - ? new UserActivity() - : userActivity, operation, resource, hasAccess) - ); + addActivity0(session, operation, resource, hasAccess); } finally { auditLock.unlock(); } } + protected abstract void addActivity0(final Session session, + final Operation operation, + final Resource resource, + final boolean hasAccess); + @Override public void stop() { + dump(); auditScheduler.shutdownNow(); try { auditScheduler.awaitTermination(5, TimeUnit.SECONDS); - dump(); } catch (final InterruptedException e) { // Intentionally ignored } @@ -115,10 +97,9 @@ public void stop() { protected void dump() { try { - formatter.format(makeDump()) - .forEach(logger::info); + createFormatter().format(makeDump()).forEach(logger::info); } catch (final Exception e) { - // handle or the background thread can die! + logger.warn("Couldn't dump messages", e); } } @@ -134,10 +115,7 @@ private Map makeDump() { return auditStorageDump; } - protected abstract UserActivity onUserActivity(final UserActivity userActivity, - final Operation operation, - final Resource resource, - final Boolean hasAccess); + protected abstract AuditorDumpFormatter createFormatter(); protected static class AuditKey { @@ -168,8 +146,5 @@ public int hashCode() { return Objects.hash(principal, sourceIp); } - public static AuditKey fromSession(final Session session) { - return new AuditKey(session.principal(), session.clientAddress()); - } } } diff --git a/src/main/java/io/aiven/kafka/auth/audit/AuditorConfig.java b/src/main/java/io/aiven/kafka/auth/audit/AuditorConfig.java index e7a0f0f..d1250b3 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/AuditorConfig.java +++ b/src/main/java/io/aiven/kafka/auth/audit/AuditorConfig.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.PRINCIPAL; -import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.PRINCIPAL_AND_SOURCE_IP; +import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.USER; +import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.USER_AND_IP; public class AuditorConfig extends AbstractConfig { @@ -30,8 +30,8 @@ public class AuditorConfig extends AbstractConfig { static final String AGGREGATION_GROUPING_CONF = "aiven.acl.authorizer.auditor.aggregation.grouping"; public enum AggregationGrouping { - PRINCIPAL("principal"), - PRINCIPAL_AND_SOURCE_IP("principal_and_source_ip"); + USER("user"), + USER_AND_IP("user_and_ip"); private final String configValue; @@ -42,6 +42,16 @@ public enum AggregationGrouping { public String getConfigValue() { return configValue; } + + public static AggregationGrouping fromConfigValue(final String configValue) { + for (final var ag : values()) { + if (ag.configValue.equals(configValue)) { + return ag; + } + } + throw new IllegalArgumentException("Unsupported aggregation grouping: " + configValue); + } + } public AuditorConfig(final Map originals) { @@ -60,9 +70,9 @@ public static ConfigDef configDef() { ).define( AGGREGATION_GROUPING_CONF, ConfigDef.Type.STRING, - PRINCIPAL_AND_SOURCE_IP.getConfigValue(), - ConfigDef.ValidString.in(PRINCIPAL.getConfigValue(), - PRINCIPAL_AND_SOURCE_IP.getConfigValue()), + USER_AND_IP.getConfigValue(), + ConfigDef.ValidString.in(USER.getConfigValue(), + USER_AND_IP.getConfigValue()), ConfigDef.Importance.HIGH, "The auditor aggregation grouping key." ); @@ -72,7 +82,7 @@ public long getAggregationPeriodInSeconds() { return getLong(AGGREGATION_PERIOD_CONF); } - public String getAggregationGrouping() { - return getString(AGGREGATION_GROUPING_CONF); + public AggregationGrouping getAggregationGrouping() { + return AggregationGrouping.fromConfigValue(getString(AGGREGATION_GROUPING_CONF)); } } diff --git a/src/main/java/io/aiven/kafka/auth/audit/AuditorDumpFormatter.java b/src/main/java/io/aiven/kafka/auth/audit/AuditorDumpFormatter.java index 45643f6..daeaa3e 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/AuditorDumpFormatter.java +++ b/src/main/java/io/aiven/kafka/auth/audit/AuditorDumpFormatter.java @@ -35,4 +35,12 @@ public interface AuditorDumpFormatter { static DateTimeFormatter dateFormatter() { return DateTimeFormatter.ISO_INSTANT; } + + default String formatUserOperation(final UserOperation userOperation) { + return (userOperation.hasAccess ? "Allow" : "Deny") + + " " + userOperation.operation.name() + " on " + + userOperation.resource.resourceType() + ":" + + userOperation.resource.name(); + } + } diff --git a/src/main/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatter.java b/src/main/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatter.java index d499345..9f2dc1f 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatter.java +++ b/src/main/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatter.java @@ -25,37 +25,29 @@ * of each principal. */ public class PrincipalAndIpFormatter implements AuditorDumpFormatter { - PrincipalAndIpFormatter() { - } @Override public List format(final Map dump) { return dump.entrySet().stream() - .map(e -> auditMessagePrincipalAndSourceIp(e.getKey(), e.getValue())) + .map(e -> buildAuditMessage(e.getKey(), e.getValue())) .collect(Collectors.toList()); } - private String auditMessagePrincipalAndSourceIp(final Auditor.AuditKey key, final UserActivity userActivity) { + private String buildAuditMessage(final Auditor.AuditKey key, final UserActivity userActivity) { + final var ua = (UserActivity.UserActivityOperations) userActivity; final StringBuilder auditMessage = new StringBuilder(key.principal.toString()); auditMessage .append(" (").append(key.sourceIp).append(")") .append(" was active since ") - .append(userActivity.activeSince.format(AuditorDumpFormatter.dateFormatter())); - if (userActivity.hasOperations()) { + .append(ua.activeSince.format(AuditorDumpFormatter.dateFormatter())); + if (!ua.operations.isEmpty()) { auditMessage.append(": ") - .append(userActivity - .operations - .stream() - .map(this::userOperationMessage) + .append(ua + .operations.stream() + .map(this::formatUserOperation) .collect(Collectors.joining(", "))); } return auditMessage.toString(); } - private String userOperationMessage(final UserOperation op) { - return (op.hasAccess ? "Allow" : "Deny") - + " " + op.operation.name() + " on " - + op.resource.resourceType() + ":" - + op.resource.name(); - } } diff --git a/src/main/java/io/aiven/kafka/auth/audit/PrincipalFormatter.java b/src/main/java/io/aiven/kafka/auth/audit/PrincipalFormatter.java index 3dad7bc..c9d62f4 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/PrincipalFormatter.java +++ b/src/main/java/io/aiven/kafka/auth/audit/PrincipalFormatter.java @@ -16,15 +16,10 @@ package io.aiven.kafka.auth.audit; -import java.net.InetAddress; -import java.time.ZonedDateTime; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.kafka.common.security.auth.KafkaPrincipal; - /** * An {@link AuditorDumpFormatter} that creates one entry per principal, * having info of each IP address in that entry. @@ -32,60 +27,34 @@ public class PrincipalFormatter implements AuditorDumpFormatter { @Override public List format(final Map dump) { - return dump.keySet().stream() - .map(k -> k.principal) - .sorted(Comparator.comparing(KafkaPrincipal::toString)) - .distinct() - .map(principal -> { - final Map principalActivities = dump.entrySet().stream() - .filter(e -> e.getKey().principal.equals(principal)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - - return auditMessagePrincipal(principal, principalActivities); - }) + return dump.entrySet().stream() + .map(e -> buildAuditMessage(e.getKey(), (UserActivity.UserActivityOperationsGropedByIP) e.getValue())) .collect(Collectors.toList()); } - private String auditMessagePrincipal(final KafkaPrincipal principal, - final Map principalActivities) { - final ZonedDateTime earliest = principalActivities.values().stream() - .map(ua -> ua.activeSince) - .sorted() - .findFirst() - .get(); - - final StringBuilder auditMessage = new StringBuilder(principal.toString()); - auditMessage + private String buildAuditMessage(final Auditor.AuditKey auditKey, + final UserActivity.UserActivityOperationsGropedByIP userActivity) { + final var auditMessage = new StringBuilder(auditKey.principal.toString()) .append(" was active since ") - .append(earliest.format(AuditorDumpFormatter.dateFormatter())) + .append(userActivity.activeSince.format(AuditorDumpFormatter.dateFormatter())) .append("."); - - final String allActivities = principalActivities.entrySet().stream() - .map(e -> { - final InetAddress sourceIp = e.getKey().sourceIp; - final UserActivity userActivity = e.getValue(); - final List operations = userActivity.operations.stream().map(op -> - (op.hasAccess ? "Allow" : "Deny") - + " " + op.operation.name() + " on " - + op.resource.resourceType() + ":" - + op.resource.name() - ).collect(Collectors.toList()); - - final StringBuilder sb = new StringBuilder(); - sb.append(sourceIp.toString()); - if (!operations.isEmpty()) { - sb.append(": "); - sb.append(String.join(", ", operations)); - } - return sb.toString(); - }) - .collect(Collectors.joining(", ")); - if (!allActivities.isBlank()) { + if (!userActivity.operations.isEmpty()) { auditMessage.append(" "); - auditMessage.append(allActivities); + auditMessage.append( + userActivity + .operations.entrySet().stream() + .map(e -> { + final var operations = new StringBuilder(); + operations.append(e.getKey()).append(": "); + operations.append( + e.getValue().stream() + .map(this::formatUserOperation) + .collect(Collectors.joining(", ")) + ); + return operations.toString(); + }).collect(Collectors.joining(", ")) + ); } - return auditMessage.toString(); } diff --git a/src/main/java/io/aiven/kafka/auth/audit/UserActivity.java b/src/main/java/io/aiven/kafka/auth/audit/UserActivity.java index c98f25a..ae209e8 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/UserActivity.java +++ b/src/main/java/io/aiven/kafka/auth/audit/UserActivity.java @@ -16,37 +16,28 @@ package io.aiven.kafka.auth.audit; +import java.net.InetAddress; import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.List; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; -class UserActivity { - public final ZonedDateTime activeSince; +abstract class UserActivity { - /** - * Ordered in the order the order the operations are added. - */ - public final List operations; + public final ZonedDateTime activeSince; - public UserActivity() { + protected UserActivity() { this(ZonedDateTime.now()); } - public UserActivity(final ZonedDateTime activeSince) { + protected UserActivity(final ZonedDateTime activeSince) { this.activeSince = activeSince; - this.operations = new ArrayList<>(); - } - - public void addOperation(final UserOperation userOperation) { - if (!operations.contains(userOperation)) { - operations.add(userOperation); - } } - public boolean hasOperations() { - return !operations.isEmpty(); - } + abstract void addOperation(final UserOperation userOperation); @Override public boolean equals(final Object o) { @@ -57,12 +48,59 @@ public boolean equals(final Object o) { return false; } final UserActivity that = (UserActivity) o; - return Objects.equals(activeSince, that.activeSince) - && Objects.equals(operations, that.operations); + return Objects.equals(activeSince, that.activeSince); } @Override public int hashCode() { - return Objects.hash(activeSince, operations); + return Objects.hash(activeSince); } + + static final class UserActivityOperations extends UserActivity { + + public UserActivityOperations() { + super(); + } + + public UserActivityOperations(final ZonedDateTime activeSince) { + super(activeSince); + } + + /** + * Ordered in the order the order the operations are added. + */ + public final Set operations = new LinkedHashSet<>(); + + @Override + void addOperation(final UserOperation userOperation) { + operations.add(userOperation); + } + + } + + static final class UserActivityOperationsGropedByIP extends UserActivity { + + public UserActivityOperationsGropedByIP() { + super(); + } + + public UserActivityOperationsGropedByIP(final ZonedDateTime activeSince) { + super(activeSince); + } + + public final Map> operations = new LinkedHashMap<>(); + + @Override + void addOperation(final UserOperation userOperation) { + final BiFunction, Set> resolveUserOperations = (ip, o) -> { + final var ops = + Objects.isNull(o) ? new LinkedHashSet() : o; + ops.add(userOperation); + return ops; + }; + operations.compute(userOperation.sourceIp, resolveUserOperations::apply); + } + + } + } diff --git a/src/main/java/io/aiven/kafka/auth/audit/UserActivityAuditor.java b/src/main/java/io/aiven/kafka/auth/audit/UserActivityAuditor.java index 32b24ab..3111ec2 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/UserActivityAuditor.java +++ b/src/main/java/io/aiven/kafka/auth/audit/UserActivityAuditor.java @@ -16,6 +16,12 @@ package io.aiven.kafka.auth.audit; +import java.util.Map; +import java.util.Objects; + +import org.apache.kafka.common.config.ConfigException; + +import kafka.network.RequestChannel; import kafka.security.auth.Operation; import kafka.security.auth.Resource; import org.slf4j.Logger; @@ -23,6 +29,16 @@ public class UserActivityAuditor extends Auditor { public UserActivityAuditor() { + super(); + } + + @Override + public void configure(final Map configs) { + super.configure(configs); + if (auditorConfig.getAggregationGrouping() == AuditorConfig.AggregationGrouping.USER) { + throw new ConfigException("Grouping by " + AuditorConfig.AggregationGrouping.USER.getConfigValue() + + " is not supported for this type of auditor"); + } } protected UserActivityAuditor(final Logger logger) { @@ -30,11 +46,20 @@ protected UserActivityAuditor(final Logger logger) { } @Override - protected UserActivity onUserActivity(final UserActivity userActivity, - final Operation operation, - final Resource resource, - final Boolean hasAccess) { - return userActivity; + protected void addActivity0(final RequestChannel.Session session, + final Operation operation, + final Resource resource, + final boolean hasAccess) { + final AuditKey auditKey = new AuditKey(session.principal(), session.clientAddress()); + + auditStorage.compute(auditKey, (key, userActivity) -> Objects.isNull(userActivity) + ? new UserActivity.UserActivityOperations() + : userActivity + ); } + @Override + protected AuditorDumpFormatter createFormatter() { + return new PrincipalAndIpFormatter(); + } } diff --git a/src/main/java/io/aiven/kafka/auth/audit/UserOperation.java b/src/main/java/io/aiven/kafka/auth/audit/UserOperation.java index 1f5f499..75e040f 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/UserOperation.java +++ b/src/main/java/io/aiven/kafka/auth/audit/UserOperation.java @@ -16,6 +16,7 @@ package io.aiven.kafka.auth.audit; +import java.net.InetAddress; import java.util.Objects; import kafka.security.auth.Operation; @@ -23,6 +24,8 @@ public class UserOperation { + public final InetAddress sourceIp; + public final Operation operation; public final Resource resource; @@ -32,6 +35,14 @@ public class UserOperation { public UserOperation(final Operation operation, final Resource resource, final boolean hasAccess) { + this(null, operation, resource, hasAccess); + } + + public UserOperation(final InetAddress sourceIp, + final Operation operation, + final Resource resource, + final boolean hasAccess) { + this.sourceIp = sourceIp; this.operation = operation; this.resource = resource; this.hasAccess = hasAccess; @@ -55,4 +66,5 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(operation, resource, hasAccess); } + } diff --git a/src/main/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditor.java b/src/main/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditor.java index ca71678..5bb5d6f 100644 --- a/src/main/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditor.java +++ b/src/main/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditor.java @@ -16,6 +16,9 @@ package io.aiven.kafka.auth.audit; +import java.util.Objects; + +import kafka.network.RequestChannel; import kafka.security.auth.Operation; import kafka.security.auth.Resource; import org.slf4j.Logger; @@ -23,6 +26,7 @@ public class UserOperationsActivityAuditor extends Auditor { public UserOperationsActivityAuditor() { + super(); } protected UserOperationsActivityAuditor(final Logger logger) { @@ -30,12 +34,56 @@ protected UserOperationsActivityAuditor(final Logger logger) { } @Override - protected UserActivity onUserActivity(final UserActivity userActivity, - final Operation operation, - final Resource resource, - final Boolean hasAccess) { - userActivity.addOperation(new UserOperation(operation, resource, hasAccess)); - return userActivity; + protected void addActivity0(final RequestChannel.Session session, + final Operation operation, + final Resource resource, + final boolean hasAccess) { + auditStorage.compute(createAuditKey(session), (key, userActivity) -> { + final UserActivity ua; + if (Objects.isNull(userActivity)) { + ua = createUserActivity(); + } else { + ua = userActivity; + } + ua.addOperation(new UserOperation(session.clientAddress(), operation, resource, hasAccess)); + return ua; + }); + } + + private AuditKey createAuditKey(final RequestChannel.Session session) { + final var grouping = auditorConfig.getAggregationGrouping(); + switch (grouping) { + case USER: + return new AuditKey(session.principal(), null); + case USER_AND_IP: + return new AuditKey(session.principal(), session.clientAddress()); + default: + throw new IllegalArgumentException("Unknown aggregation grouping type: " + grouping); + } } + private UserActivity createUserActivity() { + final var grouping = auditorConfig.getAggregationGrouping(); + switch (grouping) { + case USER: + return new UserActivity.UserActivityOperationsGropedByIP(); + case USER_AND_IP: + return new UserActivity.UserActivityOperations(); + default: + throw new IllegalArgumentException("Unknown aggregation grouping type: " + grouping); + } + } + + @Override + protected AuditorDumpFormatter createFormatter() { + final var grouping = auditorConfig.getAggregationGrouping(); + switch (grouping) { + case USER: + return new PrincipalFormatter(); + case USER_AND_IP: + return new PrincipalAndIpFormatter(); + default: + throw new IllegalArgumentException("Unknown aggregation grouping type: " + grouping); + } + } } diff --git a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java index 69d8004..b53acd0 100644 --- a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java +++ b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java @@ -25,6 +25,7 @@ import io.aiven.kafka.auth.audit.NoAuditor; import io.aiven.kafka.auth.audit.UserActivityAuditor; +import io.aiven.kafka.auth.audit.UserOperationsActivityAuditor; import org.junit.jupiter.api.Test; @@ -48,16 +49,28 @@ void correctMinimalConfig() { @Test void correctFullConfig() { - final Map properties = new HashMap<>(); - properties.put("aiven.acl.authorizer.configuration", "/test"); - properties.put("aiven.acl.authorizer.auditor.class.name", UserActivityAuditor.class.getName()); - properties.put("aiven.acl.authorizer.auditor.aggregation.period", "123"); - properties.put("aiven.acl.authorizer.log.denials", "false"); + final Map userActivityProps = new HashMap<>(); + userActivityProps.put("aiven.acl.authorizer.configuration", "/test"); + userActivityProps.put("aiven.acl.authorizer.auditor.class.name", UserActivityAuditor.class.getName()); + userActivityProps.put("aiven.acl.authorizer.auditor.aggregation.period", "123"); + userActivityProps.put("aiven.acl.authorizer.log.denials", "false"); - final AivenAclAuthorizerConfig config = new AivenAclAuthorizerConfig(properties); + var config = new AivenAclAuthorizerConfig(userActivityProps); assertEquals("/test", config.getConfigFile().getAbsolutePath()); assertEquals(UserActivityAuditor.class, config.getAuditor().getClass()); assertFalse(config.logDenials()); + + final Map userActivityOpsProps = new HashMap<>(); + userActivityOpsProps.put("aiven.acl.authorizer.configuration", "/test"); + userActivityOpsProps.put("aiven.acl.authorizer.auditor.class.name", + UserOperationsActivityAuditor.class.getName()); + userActivityOpsProps.put("aiven.acl.authorizer.auditor.aggregation.period", "123"); + userActivityOpsProps.put("aiven.acl.authorizer.log.denials", "false"); + + config = new AivenAclAuthorizerConfig(userActivityOpsProps); + assertEquals("/test", config.getConfigFile().getAbsolutePath()); + assertEquals(UserOperationsActivityAuditor.class, config.getAuditor().getClass()); + assertFalse(config.logDenials()); } @Test diff --git a/src/test/java/io/aiven/kafka/auth/audit/AuditorConfigTest.java b/src/test/java/io/aiven/kafka/auth/audit/AuditorConfigTest.java index a257a0c..c4399aa 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/AuditorConfigTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/AuditorConfigTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; class AuditorConfigTest { + @Test void correctMinimalConfig() { final Map properties = new HashMap<>(); @@ -34,6 +35,20 @@ void correctMinimalConfig() { final AuditorConfig config = new AuditorConfig(properties); assertEquals(123, config.getAggregationPeriodInSeconds()); + assertEquals(AuditorConfig.AggregationGrouping.USER_AND_IP, + config.getAggregationGrouping()); + } + + @Test + void correctFullConfig() { + final Map properties = new HashMap<>(); + properties.put("aiven.acl.authorizer.auditor.aggregation.period", "123"); + properties.put("aiven.acl.authorizer.auditor.aggregation.grouping", "user"); + + final AuditorConfig config = new AuditorConfig(properties); + assertEquals(123, config.getAggregationPeriodInSeconds()); + assertEquals(AuditorConfig.AggregationGrouping.USER, + config.getAggregationGrouping()); } @Test diff --git a/src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java b/src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java index f52c102..bc8fa9f 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java +++ b/src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java @@ -35,15 +35,29 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class FormatterTestBase { + protected RequestChannel.Session session; + protected Operation operation; + protected Resource resource; + protected AuditorDumpFormatter formatter; - private Operation anotherOperation; - private Resource anotherResource; - private RequestChannel.Session anotherSession; + + protected Operation anotherOperation; + + protected Resource anotherResource; + + protected RequestChannel.Session anotherSession; + protected InetAddress anotherInetAddress; + private final AuditorConfig.AggregationGrouping aggregationGrouping; + + protected FormatterTestBase(final AuditorConfig.AggregationGrouping aggregationGrouping) { + this.aggregationGrouping = aggregationGrouping; + } + void setUp() throws Exception { final KafkaPrincipal principal = new KafkaPrincipal("PRINCIPAL_TYPE", "PRINCIPAL_NAME"); session = new RequestChannel.Session(principal, InetAddress.getLocalHost()); @@ -67,37 +81,46 @@ void setUp() throws Exception { protected void zeroOperations(final ZonedDateTime now, final String expected) { final Map dump = new HashMap<>(); - final UserActivity userActivity = new UserActivity(now); - dump.put(Auditor.AuditKey.fromSession(session), userActivity); - + dump.put(createAuditKey(session), createUserActivity(now)); formatAndAssert(dump, expected); } protected void twoOperations(final ZonedDateTime now, final String expected) { final Map dump = new HashMap<>(); - final UserActivity userActivity = new UserActivity(now); - userActivity.operations.add(new UserOperation(operation, resource, false)); - userActivity.operations.add(new UserOperation(anotherOperation, anotherResource, true)); - dump.put(Auditor.AuditKey.fromSession(session), userActivity); + final UserActivity userActivity = createUserActivity(now); + userActivity.addOperation(new UserOperation(session.clientAddress(), operation, resource, false)); + userActivity.addOperation( + new UserOperation(session.clientAddress(), anotherOperation, anotherResource, true)); + dump.put(createAuditKey(session), userActivity); formatAndAssert(dump, expected); } - protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String... expected) { - final Map dump = new HashMap<>(); - final UserActivity userActivity = new UserActivity(now); - userActivity.operations.add(new UserOperation(operation, resource, false)); - dump.put(Auditor.AuditKey.fromSession(session), userActivity); - - final UserActivity anotherUserActivity = new UserActivity(now); - anotherUserActivity.operations.add(new UserOperation(anotherOperation, anotherResource, true)); - dump.put(Auditor.AuditKey.fromSession(anotherSession), anotherUserActivity); + protected Auditor.AuditKey createAuditKey(final RequestChannel.Session session) { + switch (aggregationGrouping) { + case USER: + return new Auditor.AuditKey(session.principal(), null); + case USER_AND_IP: + return new Auditor.AuditKey(session.principal(), session.clientAddress()); + default: + throw new IllegalArgumentException("Unknown aggregation grouping: " + aggregationGrouping); + } + } - formatAndAssert(dump, expected); + protected UserActivity createUserActivity(final ZonedDateTime time) { + switch (aggregationGrouping) { + case USER: + return new UserActivity.UserActivityOperationsGropedByIP(time); + case USER_AND_IP: + return new UserActivity.UserActivityOperations(time); + default: + throw new IllegalArgumentException("Unknown aggregation grouping: " + aggregationGrouping); + } } - private void formatAndAssert(final Map dump, final String... expected) { + protected void formatAndAssert(final Map dump, final String... expected) { final List entries = formatter.format(dump); + assertEquals(expected.length, entries.size()); assertEquals(Arrays.asList(expected), entries); } diff --git a/src/test/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatterTest.java b/src/test/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatterTest.java index 30f79fd..d0dd702 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatterTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/PrincipalAndIpFormatterTest.java @@ -18,6 +18,8 @@ import java.net.InetAddress; import java.time.ZonedDateTime; +import java.util.HashMap; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -26,6 +28,11 @@ * Tests for {@link PrincipalAndIpFormatter}. */ public class PrincipalAndIpFormatterTest extends FormatterTestBase { + + public PrincipalAndIpFormatterTest() { + super(AuditorConfig.AggregationGrouping.USER_AND_IP); + } + @BeforeEach public void setUp() throws Exception { super.setUp(); @@ -74,4 +81,20 @@ public void shouldBuildRightLogMessageTwoOperationsTwoIps() throws Exception { twoOperationsTwoIpAddresses(now, expected1, expected2); } + + protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String... expected) { + final Map dump = new HashMap<>(); + + final UserActivity userActivity = createUserActivity(now); + userActivity.addOperation(new UserOperation(session.clientAddress(), operation, resource, false)); + dump.put(createAuditKey(session), userActivity); + + final UserActivity anotherUserActivity = createUserActivity(now); + anotherUserActivity.addOperation( + new UserOperation(anotherSession.clientAddress(), anotherOperation, anotherResource, true)); + dump.put(createAuditKey(anotherSession), anotherUserActivity); + + formatAndAssert(dump, expected); + } + } diff --git a/src/test/java/io/aiven/kafka/auth/audit/PrincipalFormatterTest.java b/src/test/java/io/aiven/kafka/auth/audit/PrincipalFormatterTest.java index 1b1ea6b..55b3ce6 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/PrincipalFormatterTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/PrincipalFormatterTest.java @@ -18,14 +18,18 @@ import java.net.InetAddress; import java.time.ZonedDateTime; +import java.util.HashMap; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -/** - * Tests for {@link PrincipalFormatter}. - */ public class PrincipalFormatterTest extends FormatterTestBase { + + public PrincipalFormatterTest() { + super(AuditorConfig.AggregationGrouping.USER); + } + @BeforeEach public void setUp() throws Exception { super.setUp(); @@ -36,10 +40,8 @@ public void setUp() throws Exception { public void shouldBuildRightLogMessageZeroOperations() throws Exception { final ZonedDateTime now = ZonedDateTime.now(); final String expected = String.format( - "PRINCIPAL_TYPE:PRINCIPAL_NAME was active since %s. %s", - now.format(AuditorDumpFormatter.dateFormatter()), - InetAddress.getLocalHost() - ); + "PRINCIPAL_TYPE:PRINCIPAL_NAME was active since %s.", + now.format(AuditorDumpFormatter.dateFormatter())); zeroOperations(now, expected); } @@ -70,4 +72,18 @@ public void shouldBuildRightLogMessageTwoOperationsTwoIps() throws Exception { twoOperationsTwoIpAddresses(now, expected); } + + protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String... expected) { + final Map dump = new HashMap<>(); + + final UserActivity userActivity = createUserActivity(now); + userActivity.addOperation( + new UserOperation(session.clientAddress(), operation, resource, false)); + userActivity.addOperation( + new UserOperation(anotherSession.clientAddress(), anotherOperation, anotherResource, true)); + dump.put(createAuditKey(session), userActivity); + + formatAndAssert(dump, expected); + } + } diff --git a/src/test/java/io/aiven/kafka/auth/audit/UserActivityAuditorTest.java b/src/test/java/io/aiven/kafka/auth/audit/UserActivityAuditorTest.java index 8531da8..56d3c20 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/UserActivityAuditorTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/UserActivityAuditorTest.java @@ -20,12 +20,13 @@ import java.net.UnknownHostException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Map; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import com.google.common.collect.ImmutableMap; import kafka.network.RequestChannel.Session; import kafka.security.auth.Operation; import kafka.security.auth.Resource; @@ -39,6 +40,7 @@ import org.slf4j.Logger; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -55,7 +57,8 @@ class UserActivityAuditorTest { @BeforeEach void setUp() throws Exception { - final KafkaPrincipal principal = new KafkaPrincipal("PRINCIPAL_TYPE", "PRINCIPAL_NAME"); + final KafkaPrincipal principal = + new KafkaPrincipal("PRINCIPAL_TYPE", "PRINCIPAL_NAME"); session = new Session(principal, InetAddress.getLocalHost()); resource = new Resource( @@ -98,9 +101,25 @@ public void shouldBuildRightLogMessage() throws UnknownHostException { assertTrue(diffSeconds < 3); } + @Test + void shouldThrowConfigExceptionForAggregationGrouping() { + + final var props = Map.of( + AuditorConfig.AGGREGATION_PERIOD_CONF, Long.MAX_VALUE, + AuditorConfig.AGGREGATION_GROUPING_CONF, AuditorConfig.AggregationGrouping.USER.getConfigValue() + ); + final var e = assertThrows( + ConfigException.class, () -> createAuditor(props)); + assertEquals("Grouping by user is not supported for this type of auditor", e.getMessage()); + } + private UserActivityAuditor createAuditor() { + return createAuditor(Map.of(AuditorConfig.AGGREGATION_PERIOD_CONF, Long.MAX_VALUE)); + } + + private UserActivityAuditor createAuditor(final Map props) { final UserActivityAuditor auditor = new UserActivityAuditor(logger); - auditor.configure(ImmutableMap.of(AuditorConfig.AGGREGATION_PERIOD_CONF, Long.MAX_VALUE)); + auditor.configure(props); return auditor; } } diff --git a/src/test/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditorTest.java b/src/test/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditorTest.java index 45d0a69..cdbf526 100644 --- a/src/test/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditorTest.java +++ b/src/test/java/io/aiven/kafka/auth/audit/UserOperationsActivityAuditorTest.java @@ -21,6 +21,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -28,7 +29,6 @@ import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.security.auth.KafkaPrincipal; -import com.google.common.collect.ImmutableMap; import kafka.network.RequestChannel.Session; import kafka.security.auth.Operation; import kafka.security.auth.Resource; @@ -69,11 +69,11 @@ void setUp() throws Exception { operation = Operation.fromJava(AclOperation.ALL); resource = - new Resource( - ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.CLUSTER), - "RESOURCE_NAME", - PatternType.LITERAL - ); + new Resource( + ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.CLUSTER), + "RESOURCE_NAME", + PatternType.LITERAL + ); } @Test @@ -91,10 +91,10 @@ public void shouldAddUserActivityAndOperation() throws UnknownHostException { auditor.addActivity(session, operation, resource, false); assertEquals(1, auditor.auditStorage.size()); assertEquals( - 1, - auditor.auditStorage.get( - new Auditor.AuditKey(principal, InetAddress.getLocalHost()) - ).operations.size() + 1, + cast(auditor.auditStorage.get( + new Auditor.AuditKey(principal, InetAddress.getLocalHost()) + ), UserActivity.UserActivityOperations.class).operations.size() ); auditor.dump(); assertEquals(0, auditor.auditStorage.size()); @@ -104,7 +104,7 @@ public void shouldAddUserActivityAndOperation() throws UnknownHostException { void shouldAggregateOperationsForSameUser() throws Exception { final Session anotherSession = - new Session(principal, InetAddress.getByName("127.0.0.2")); + new Session(principal, InetAddress.getByName("127.0.0.2")); final UserOperationsActivityAuditor auditor = createAuditor(); @@ -113,25 +113,58 @@ void shouldAggregateOperationsForSameUser() throws Exception { auditor.addActivity(anotherSession, operation, resource, true); assertEquals(2, auditor.auditStorage.size()); assertEquals( - 2, - auditor.auditStorage.get( - new Auditor.AuditKey( - session.principal(), - session.clientAddress()) - ).operations.size() + 2, + cast(auditor.auditStorage.get( + new Auditor.AuditKey( + session.principal(), + session.clientAddress()) + ), UserActivity.UserActivityOperations.class).operations.size() + ); + assertEquals( + 1, + cast(auditor.auditStorage.get( + new Auditor.AuditKey( + anotherSession.principal(), + anotherSession.clientAddress()) + ), UserActivity.UserActivityOperations.class).operations.size() ); + auditor.dump(); + assertEquals(0, auditor.auditStorage.size()); + } + + @Test + void shouldAggregateOperationsForSameUserAndPrincipalGrouping() throws Exception { + + final Session anotherSession = + new Session(principal, InetAddress.getByName("127.0.0.2")); + + final UserOperationsActivityAuditor auditor = + createAuditor(Map.of( + AuditorConfig.AGGREGATION_PERIOD_CONF, + 10L, + AuditorConfig.AGGREGATION_GROUPING_CONF, + AuditorConfig.AggregationGrouping.USER.getConfigValue())); + + auditor.addActivity(session, operation, resource, false); + auditor.addActivity(session, operation, resource, true); + auditor.addActivity(anotherSession, operation, resource, true); + assertEquals(1, auditor.auditStorage.size()); assertEquals( - 1, - auditor.auditStorage.get( - new Auditor.AuditKey( - anotherSession.principal(), - anotherSession.clientAddress()) - ).operations.size() + 2, + cast(auditor.auditStorage.get( + new Auditor.AuditKey( + session.principal(), + null) + ), UserActivity.UserActivityOperationsGropedByIP.class).operations.size() ); auditor.dump(); assertEquals(0, auditor.auditStorage.size()); } + private T cast(final UserActivity userActivity, final Class clazz) { + return clazz.cast(userActivity); + } + @Test public void shouldBuildRightLogMessage() throws Exception { final UserOperationsActivityAuditor auditor = createAuditor(); @@ -170,18 +203,71 @@ public void shouldBuildRightLogMessage() throws Exception { .split(",")).map(String::trim).collect(Collectors.toSet()); - final String[] expectedOperations = new String[] { - "Deny All on Cluster:RESOURCE_NAME", - "Allow Alter on DelegationToken:ANOTHER_RESOURCE_NAME" - }; + assertThat( + loggedOperations, + containsInAnyOrder( + "Deny All on Cluster:RESOURCE_NAME", + "Allow Alter on DelegationToken:ANOTHER_RESOURCE_NAME" + ) + ); + } - assertThat(loggedOperations, containsInAnyOrder(expectedOperations)); + @Test + public void shouldBuildRightLogMessageForPrincipalGrouping() throws Exception { + final UserOperationsActivityAuditor auditor = + createAuditor(Map.of( + AuditorConfig.AGGREGATION_PERIOD_CONF, + 10L, + AuditorConfig.AGGREGATION_GROUPING_CONF, + AuditorConfig.AggregationGrouping.USER.getConfigValue())); + final Operation anotherOperation = Operation.fromJava(AclOperation.ALTER); + final Resource anotherResource = new Resource( + ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.DELEGATION_TOKEN), + "ANOTHER_RESOURCE_NAME", + PatternType.LITERAL + ); + + final ArgumentCaptor logCaptor = ArgumentCaptor.forClass(String.class); + auditor.addActivity(session, operation, resource, false); + auditor.addActivity(session, anotherOperation, anotherResource, true); + auditor.dump(); + + verify(logger).info(logCaptor.capture()); + + final String expectedPrefix = + "PRINCIPAL_TYPE:PRINCIPAL_NAME was active since "; + + assertTrue(logCaptor.getValue().startsWith(expectedPrefix)); + final String timestampStr = + logCaptor.getValue() + .substring( + expectedPrefix.length(), + logCaptor.getValue().indexOf(". ") + ); + final Instant instant = Instant.parse(timestampStr); + final long diffSeconds = Math.abs(ChronoUnit.SECONDS.between(instant, Instant.now())); + assertTrue(diffSeconds < 3); + + final Set loggedOperations = Arrays.stream(logCaptor.getValue() + .substring(logCaptor.getValue().indexOf(": ") + 1) + .split(",")).map(String::trim).collect(Collectors.toSet()); + + assertThat( + loggedOperations, + containsInAnyOrder( + "Deny All on Cluster:RESOURCE_NAME", + "Allow Alter on DelegationToken:ANOTHER_RESOURCE_NAME") + ); } private UserOperationsActivityAuditor createAuditor() { + return createAuditor(Map.of(AuditorConfig.AGGREGATION_PERIOD_CONF, 10L)); + } + + private UserOperationsActivityAuditor createAuditor(final Map props) { final UserOperationsActivityAuditor auditor = - new UserOperationsActivityAuditor(logger); - auditor.configure(ImmutableMap.of(AuditorConfig.AGGREGATION_PERIOD_CONF, 10L)); + new UserOperationsActivityAuditor(logger); + auditor.configure(props); return auditor; }