Skip to content

Commit

Permalink
Merge pull request #29 from aiven/use-clean-build-in-wrkflow
Browse files Browse the repository at this point in the history
Code refactoring
  • Loading branch information
ivanyu authored Jul 2, 2021
2 parents f0051a7 + 62e404e commit eb0de44
Show file tree
Hide file tree
Showing 16 changed files with 492 additions and 220 deletions.
59 changes: 17 additions & 42 deletions src/main/java/io/aiven/kafka/auth/audit/Auditor.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.PRINCIPAL;
import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.PRINCIPAL_AND_SOURCE_IP;

public abstract class Auditor implements AuditorAPI {

private final Logger logger;
Expand All @@ -45,10 +42,9 @@ public abstract class Auditor implements AuditorAPI {

private final Lock auditLock = new ReentrantLock();

private final ScheduledExecutorService auditScheduler =
Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService auditScheduler = Executors.newScheduledThreadPool(1);

private AuditorDumpFormatter formatter;
protected AuditorConfig auditorConfig;

public Auditor() {
this(LoggerFactory.getLogger("aiven.auditor.logger"));
Expand All @@ -61,64 +57,49 @@ protected Auditor(final Logger logger) {

@Override
public void configure(final Map<String, ?> configs) {
final AuditorConfig auditorConfig = new AuditorConfig(configs);
auditorConfig = new AuditorConfig(configs);
auditScheduler.scheduleAtFixedRate(
this::dump,
auditorConfig.getAggregationPeriodInSeconds(),
auditorConfig.getAggregationPeriodInSeconds(),
TimeUnit.SECONDS
);

formatter = createFormatter(auditorConfig);
}

private AuditorDumpFormatter createFormatter(final AuditorConfig auditorConfig) {
final String grouping = auditorConfig.getAggregationGrouping();
if (PRINCIPAL_AND_SOURCE_IP.getConfigValue().equals(grouping)) {
return new PrincipalAndIpFormatter();
} else if (PRINCIPAL.getConfigValue().equals(grouping)) {
return new PrincipalFormatter();
} else {
throw new RuntimeException("Not implemented");
}
}

@Override
public void addActivity(final Session session,
final Operation operation,
final Resource resource,
final boolean hasAccess) {

final AuditKey auditKey = AuditKey.fromSession(session);
public final void addActivity(final Session session,
final Operation operation,
final Resource resource,
final boolean hasAccess) {
auditLock.lock();
try {
auditStorage.compute(auditKey, (key, userActivity) ->
onUserActivity(Objects.isNull(userActivity)
? new UserActivity()
: userActivity, operation, resource, hasAccess)
);
addActivity0(session, operation, resource, hasAccess);
} finally {
auditLock.unlock();
}
}

protected abstract void addActivity0(final Session session,
final Operation operation,
final Resource resource,
final boolean hasAccess);

@Override
public void stop() {
dump();
auditScheduler.shutdownNow();
try {
auditScheduler.awaitTermination(5, TimeUnit.SECONDS);
dump();
} catch (final InterruptedException e) {
// Intentionally ignored
}
}

protected void dump() {
try {
formatter.format(makeDump())
.forEach(logger::info);
createFormatter().format(makeDump()).forEach(logger::info);
} catch (final Exception e) {
// handle or the background thread can die!
logger.warn("Couldn't dump messages", e);
}
}

Expand All @@ -134,10 +115,7 @@ private Map<AuditKey, UserActivity> makeDump() {
return auditStorageDump;
}

protected abstract UserActivity onUserActivity(final UserActivity userActivity,
final Operation operation,
final Resource resource,
final Boolean hasAccess);
protected abstract AuditorDumpFormatter createFormatter();

protected static class AuditKey {

Expand Down Expand Up @@ -168,8 +146,5 @@ public int hashCode() {
return Objects.hash(principal, sourceIp);
}

public static AuditKey fromSession(final Session session) {
return new AuditKey(session.principal(), session.clientAddress());
}
}
}
28 changes: 19 additions & 9 deletions src/main/java/io/aiven/kafka/auth/audit/AuditorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.PRINCIPAL;
import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.PRINCIPAL_AND_SOURCE_IP;
import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.USER;
import static io.aiven.kafka.auth.audit.AuditorConfig.AggregationGrouping.USER_AND_IP;

public class AuditorConfig extends AbstractConfig {

static final String AGGREGATION_PERIOD_CONF = "aiven.acl.authorizer.auditor.aggregation.period";
static final String AGGREGATION_GROUPING_CONF = "aiven.acl.authorizer.auditor.aggregation.grouping";

public enum AggregationGrouping {
PRINCIPAL("principal"),
PRINCIPAL_AND_SOURCE_IP("principal_and_source_ip");
USER("user"),
USER_AND_IP("user_and_ip");

private final String configValue;

Expand All @@ -42,6 +42,16 @@ public enum AggregationGrouping {
public String getConfigValue() {
return configValue;
}

public static AggregationGrouping fromConfigValue(final String configValue) {
for (final var ag : values()) {
if (ag.configValue.equals(configValue)) {
return ag;
}
}
throw new IllegalArgumentException("Unsupported aggregation grouping: " + configValue);
}

}

public AuditorConfig(final Map<?, ?> originals) {
Expand All @@ -60,9 +70,9 @@ public static ConfigDef configDef() {
).define(
AGGREGATION_GROUPING_CONF,
ConfigDef.Type.STRING,
PRINCIPAL_AND_SOURCE_IP.getConfigValue(),
ConfigDef.ValidString.in(PRINCIPAL.getConfigValue(),
PRINCIPAL_AND_SOURCE_IP.getConfigValue()),
USER_AND_IP.getConfigValue(),
ConfigDef.ValidString.in(USER.getConfigValue(),
USER_AND_IP.getConfigValue()),
ConfigDef.Importance.HIGH,
"The auditor aggregation grouping key."
);
Expand All @@ -72,7 +82,7 @@ public long getAggregationPeriodInSeconds() {
return getLong(AGGREGATION_PERIOD_CONF);
}

public String getAggregationGrouping() {
return getString(AGGREGATION_GROUPING_CONF);
public AggregationGrouping getAggregationGrouping() {
return AggregationGrouping.fromConfigValue(getString(AGGREGATION_GROUPING_CONF));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ public interface AuditorDumpFormatter {
static DateTimeFormatter dateFormatter() {
return DateTimeFormatter.ISO_INSTANT;
}

default String formatUserOperation(final UserOperation userOperation) {
return (userOperation.hasAccess ? "Allow" : "Deny")
+ " " + userOperation.operation.name() + " on "
+ userOperation.resource.resourceType() + ":"
+ userOperation.resource.name();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,29 @@
* of each principal.
*/
public class PrincipalAndIpFormatter implements AuditorDumpFormatter {
PrincipalAndIpFormatter() {
}

@Override
public List<String> format(final Map<Auditor.AuditKey, UserActivity> dump) {
return dump.entrySet().stream()
.map(e -> auditMessagePrincipalAndSourceIp(e.getKey(), e.getValue()))
.map(e -> buildAuditMessage(e.getKey(), e.getValue()))
.collect(Collectors.toList());
}

private String auditMessagePrincipalAndSourceIp(final Auditor.AuditKey key, final UserActivity userActivity) {
private String buildAuditMessage(final Auditor.AuditKey key, final UserActivity userActivity) {
final var ua = (UserActivity.UserActivityOperations) userActivity;
final StringBuilder auditMessage = new StringBuilder(key.principal.toString());
auditMessage
.append(" (").append(key.sourceIp).append(")")
.append(" was active since ")
.append(userActivity.activeSince.format(AuditorDumpFormatter.dateFormatter()));
if (userActivity.hasOperations()) {
.append(ua.activeSince.format(AuditorDumpFormatter.dateFormatter()));
if (!ua.operations.isEmpty()) {
auditMessage.append(": ")
.append(userActivity
.operations
.stream()
.map(this::userOperationMessage)
.append(ua
.operations.stream()
.map(this::formatUserOperation)
.collect(Collectors.joining(", ")));
}
return auditMessage.toString();
}

private String userOperationMessage(final UserOperation op) {
return (op.hasAccess ? "Allow" : "Deny")
+ " " + op.operation.name() + " on "
+ op.resource.resourceType() + ":"
+ op.resource.name();
}
}
73 changes: 21 additions & 52 deletions src/main/java/io/aiven/kafka/auth/audit/PrincipalFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,45 @@

package io.aiven.kafka.auth.audit;

import java.net.InetAddress;
import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.common.security.auth.KafkaPrincipal;

/**
* An {@link AuditorDumpFormatter} that creates one entry per principal,
* having info of each IP address in that entry.
*/
public class PrincipalFormatter implements AuditorDumpFormatter {
@Override
public List<String> format(final Map<Auditor.AuditKey, UserActivity> dump) {
return dump.keySet().stream()
.map(k -> k.principal)
.sorted(Comparator.comparing(KafkaPrincipal::toString))
.distinct()
.map(principal -> {
final Map<Auditor.AuditKey, UserActivity> principalActivities = dump.entrySet().stream()
.filter(e -> e.getKey().principal.equals(principal))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));


return auditMessagePrincipal(principal, principalActivities);
})
return dump.entrySet().stream()
.map(e -> buildAuditMessage(e.getKey(), (UserActivity.UserActivityOperationsGropedByIP) e.getValue()))
.collect(Collectors.toList());
}

private String auditMessagePrincipal(final KafkaPrincipal principal,
final Map<Auditor.AuditKey, UserActivity> principalActivities) {
final ZonedDateTime earliest = principalActivities.values().stream()
.map(ua -> ua.activeSince)
.sorted()
.findFirst()
.get();

final StringBuilder auditMessage = new StringBuilder(principal.toString());
auditMessage
private String buildAuditMessage(final Auditor.AuditKey auditKey,
final UserActivity.UserActivityOperationsGropedByIP userActivity) {
final var auditMessage = new StringBuilder(auditKey.principal.toString())
.append(" was active since ")
.append(earliest.format(AuditorDumpFormatter.dateFormatter()))
.append(userActivity.activeSince.format(AuditorDumpFormatter.dateFormatter()))
.append(".");

final String allActivities = principalActivities.entrySet().stream()
.map(e -> {
final InetAddress sourceIp = e.getKey().sourceIp;
final UserActivity userActivity = e.getValue();
final List<String> operations = userActivity.operations.stream().map(op ->
(op.hasAccess ? "Allow" : "Deny")
+ " " + op.operation.name() + " on "
+ op.resource.resourceType() + ":"
+ op.resource.name()
).collect(Collectors.toList());

final StringBuilder sb = new StringBuilder();
sb.append(sourceIp.toString());
if (!operations.isEmpty()) {
sb.append(": ");
sb.append(String.join(", ", operations));
}
return sb.toString();
})
.collect(Collectors.joining(", "));
if (!allActivities.isBlank()) {
if (!userActivity.operations.isEmpty()) {
auditMessage.append(" ");
auditMessage.append(allActivities);
auditMessage.append(
userActivity
.operations.entrySet().stream()
.map(e -> {
final var operations = new StringBuilder();
operations.append(e.getKey()).append(": ");
operations.append(
e.getValue().stream()
.map(this::formatUserOperation)
.collect(Collectors.joining(", "))
);
return operations.toString();
}).collect(Collectors.joining(", "))
);
}

return auditMessage.toString();
}

Expand Down
Loading

0 comments on commit eb0de44

Please sign in to comment.