Skip to content

Commit

Permalink
Merge pull request #206 from Aiven-Open/jeqo/metrics
Browse files Browse the repository at this point in the history
feat: add authorize operation metrics
  • Loading branch information
biggusdonzus authored Oct 15, 2024
2 parents b1e1166 + 6f348d4 commit 0e175d1
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 12 deletions.
68 changes: 59 additions & 9 deletions src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,37 @@
import java.io.File;
import java.util.Map;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.Sensor;

import io.aiven.kafka.auth.audit.AuditorAPI;
import io.aiven.kafka.auth.audit.NoAuditor;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

public final class AivenAclAuthorizerConfig extends AbstractConfig {
private static final String CONFIGURATION_CONF = "aiven.acl.authorizer.configuration";
private static final String AUDITOR_CLASS_NAME_CONF = "aiven.acl.authorizer.auditor.class.name";
private static final String LOG_DENIALS_CONF = "aiven.acl.authorizer.log.denials";
private static final String CONFIG_REFRESH_CONF = "aiven.acl.authorizer.config.refresh.interval";
private static final String LIST_ACLS_ENABLED_CONF = "aiven.acl.authorizer.list.acls.enabled";
public static final String PREFIX = "aiven.acl.authorizer.";
private static final String CONFIGURATION_CONF = PREFIX + "configuration";
private static final String AUDITOR_CLASS_NAME_CONF = PREFIX + "auditor.class.name";
private static final String LOG_DENIALS_CONF = PREFIX + "log.denials";
private static final String CONFIG_REFRESH_CONF = PREFIX + "config.refresh.interval";
private static final String LIST_ACLS_ENABLED_CONF = PREFIX + "list.acls.enabled";


public static final String METRICS_NUM_SAMPLES_CONFIG = PREFIX
+ CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;

public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = PREFIX
+ CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC;

public static final String METRICS_RECORDING_LEVEL_CONFIG = PREFIX
+ CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;

public AivenAclAuthorizerConfig(final Map<?, ?> originals) {
super(configDef(), originals);
Expand All @@ -44,30 +63,61 @@ public static ConfigDef configDef() {
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"The path to the configuration file"
).define(
)
.define(
AUDITOR_CLASS_NAME_CONF,
ConfigDef.Type.CLASS,
NoAuditor.class,
ConfigDef.Importance.MEDIUM,
"The auditor class fully qualified name"
).define(
)
.define(
LOG_DENIALS_CONF,
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.LOW,
"Whether to log denials on INFO level"
).define(
)
.define(
CONFIG_REFRESH_CONF,
ConfigDef.Type.INT,
10_000,
ConfigDef.Importance.LOW,
"The interval between ACL reloads"
).define(
)
.define(
LIST_ACLS_ENABLED_CONF,
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.LOW,
"Whether to allow listing ACLs"
)
// metric configs
.define(
METRICS_SAMPLE_WINDOW_MS_CONFIG,
ConfigDef.Type.LONG,
30000,
atLeast(1),
ConfigDef.Importance.LOW,
METRICS_SAMPLE_WINDOW_MS_DOC
)
.define(
METRICS_NUM_SAMPLES_CONFIG,
ConfigDef.Type.INT,
2,
atLeast(1),
ConfigDef.Importance.LOW,
METRICS_NUM_SAMPLES_DOC
)
.define(
METRICS_RECORDING_LEVEL_CONFIG,
ConfigDef.Type.STRING,
Sensor.RecordingLevel.INFO.toString(),
in(Sensor.RecordingLevel.INFO.toString(),
Sensor.RecordingLevel.DEBUG.toString(),
Sensor.RecordingLevel.TRACE.toString()),
ConfigDef.Importance.LOW,
METRICS_RECORDING_LEVEL_DOC
);
}

Expand Down
159 changes: 159 additions & 0 deletions src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2024 Aiven Oy https://aiven.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.auth;

import java.util.List;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.AuthorizationResult;

public class AivenAclAuthorizerMetrics {
static final String METRIC_GROUP_NAME = "auth-metrics";
static final String AUTH_OP_ALLOW = "auth-ops-allow";
static final String AUTH_OP_ALLOW_DESC = "successful authorizer operations allowed based on ACLs";
static final String AUTH_OP_ALLOW_RATE = AUTH_OP_ALLOW + "-rate";
static final String AUTH_OP_ALLOW_TOTAL = AUTH_OP_ALLOW + "-total";
final MetricNameTemplate authOpAllowRate = new MetricNameTemplate(
AUTH_OP_ALLOW_RATE,
METRIC_GROUP_NAME,
"Rate of " + AUTH_OP_ALLOW_DESC
);
final MetricNameTemplate authOpAllowRateByOperation = new MetricNameTemplate(
AUTH_OP_ALLOW_RATE,
METRIC_GROUP_NAME,
"Rate of " + AUTH_OP_ALLOW_DESC + " tagged by operation type",
"operation"
);
final MetricNameTemplate authOpAllowTotal = new MetricNameTemplate(
AUTH_OP_ALLOW_TOTAL,
METRIC_GROUP_NAME,
"Total number of " + AUTH_OP_ALLOW_DESC
);
final MetricNameTemplate authOpAllowTotalByOperation = new MetricNameTemplate(
AUTH_OP_ALLOW_TOTAL,
METRIC_GROUP_NAME,
"Total number of " + AUTH_OP_ALLOW_DESC + " tagged by operation type",
"operation"
);
static final String AUTH_OP_DENY = "auth-ops-deny";
static final String AUTH_OP_DENY_DESC = "authorizer operations denied/blocked based on ACLs";
static final String AUTH_OP_DENY_RATE = AUTH_OP_DENY + "-rate";
static final String AUTH_OP_DENY_TOTAL = AUTH_OP_DENY + "-total";
final MetricNameTemplate authOpDenyRate = new MetricNameTemplate(
AUTH_OP_DENY_RATE,
METRIC_GROUP_NAME,
"Rate of " + AUTH_OP_DENY_DESC
);
final MetricNameTemplate authOpDenyRateByOperationResourcePrincipal = new MetricNameTemplate(
AUTH_OP_DENY_RATE,
METRIC_GROUP_NAME,
"Rate of " + AUTH_OP_DENY_DESC + " tagged by operation type, resource name, and principal name",
"operation",
"resource",
"principal"
);
final MetricNameTemplate authOpDenyTotal = new MetricNameTemplate(
AUTH_OP_DENY_TOTAL,
METRIC_GROUP_NAME,
"Total number of " + AUTH_OP_DENY_DESC
);
final MetricNameTemplate authOpDenyTotalByOperationResourcePrincipal = new MetricNameTemplate(
AUTH_OP_DENY_TOTAL,
METRIC_GROUP_NAME,
"Total number of " + AUTH_OP_DENY_DESC + " tagged by operation type, resource name, and principal name",
"operation",
"resource",
"principal"
);

final Metrics metrics;
final Sensor authOpAllowSensor;
final Sensor authOpDenySensor;

public AivenAclAuthorizerMetrics(final Time time, final MetricConfig metricConfig) {
final JmxReporter reporter = new JmxReporter();

this.metrics = new Metrics(
metricConfig,
List.of(reporter),
time,
new KafkaMetricsContext("aiven.kafka.auth")
);

authOpAllowSensor = metrics.sensor(AUTH_OP_ALLOW, RecordingLevel.INFO);
authOpAllowSensor.add(metrics.metricInstance(authOpAllowRate), new Rate());
authOpAllowSensor.add(metrics.metricInstance(authOpAllowTotal), new CumulativeCount());
authOpDenySensor = metrics.sensor(AUTH_OP_DENY, RecordingLevel.INFO);
authOpDenySensor.add(metrics.metricInstance(authOpDenyRate), new Rate());
authOpDenySensor.add(metrics.metricInstance(authOpDenyTotal), new CumulativeCount());
}

public void recordLogAuthResult(
final AuthorizationResult result,
final AclOperation operation,
final ResourcePattern resourcePattern,
final KafkaPrincipal principal
) {
switch (result) {
case ALLOWED:
authOpAllowSensor.add(
metrics.metricInstance(
authOpAllowRateByOperation,
"operation", operation.name()
),
new Rate());
authOpAllowSensor.add(
metrics.metricInstance(
authOpAllowTotalByOperation,
"operation", operation.name()
),
new CumulativeCount());
authOpAllowSensor.record();
break;
case DENIED:
authOpDenySensor.add(
metrics.metricInstance(
authOpDenyRateByOperationResourcePrincipal,
"operation", operation.name(),
"resource", resourcePattern.name(),
"principal", principal.getName()),
new Rate());
authOpDenySensor.add(
metrics.metricInstance(
authOpDenyTotalByOperationResourcePrincipal,
"operation", operation.name(),
"resource", resourcePattern.name(),
"principal", principal.getName()),
new CumulativeCount());
authOpDenySensor.record();
break;
default: break;
}
}
}
29 changes: 28 additions & 1 deletion src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
Expand All @@ -63,24 +66,45 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.kafka.auth.AivenAclAuthorizerConfig.METRICS_NUM_SAMPLES_CONFIG;
import static io.aiven.kafka.auth.AivenAclAuthorizerConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static io.aiven.kafka.auth.AivenAclAuthorizerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG;

public class AivenAclAuthorizerV2 implements Authorizer {

private static final Logger LOGGER = LoggerFactory.getLogger(AivenAclAuthorizerV2.class);

private File configFile;
private AuditorAPI auditor;
private boolean logDenials;
private ScheduledExecutorService scheduledExecutorService;

private volatile WatchService watchService;

private final AtomicReference<VerdictCache> cacheReference = new AtomicReference<>();
private final Time time;

private AivenAclAuthorizerConfig config;
private AivenAclAuthorizerMetrics metrics;

public AivenAclAuthorizerV2() {
this(Time.SYSTEM);
}

// for testing
AivenAclAuthorizerV2(final Time time) {
this.time = time;
}

@Override
public void configure(final java.util.Map<String, ?> configs) {
config = new AivenAclAuthorizerConfig(configs);

final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)));
metrics = new AivenAclAuthorizerMetrics(time, metricConfig);
}

@Override
Expand Down Expand Up @@ -171,13 +195,16 @@ public final List<AuthorizationResult> authorize(final AuthorizableRequestContex
final boolean verdict = cacheReference.get().get(principal,
LegacyOperationNameFormatter.format(operation),
resourceToCheck);
final var authResult = verdict ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED;

metrics.recordLogAuthResult(authResult, operation, resourcePattern, principal);
logAuthVerdict(verdict, operation, resourcePattern, principal, requestContext,
action.logIfAllowed(), action.logIfDenied());

final var session = new Session(principal, requestContext.clientAddress());
auditor.addActivity(session, action.operation(), action.resourcePattern(), verdict);

result.add(verdict ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED);
result.add(authResult);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;

import io.aiven.kafka.auth.audit.NoAuditor;
import io.aiven.kafka.auth.audit.UserActivityAuditor;
import io.aiven.kafka.auth.audit.UserOperationsActivityAuditor;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -46,6 +48,13 @@ void correctMinimalConfig() {
assertEquals(NoAuditor.class, config.getAuditor().getClass());
assertTrue(config.logDenials());
assertTrue(config.listAclsEnabled());
// metric configs
assertThat(config.getLong(AivenAclAuthorizerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG))
.isEqualTo(30_000);
assertThat(config.getInt(AivenAclAuthorizerConfig.METRICS_NUM_SAMPLES_CONFIG))
.isEqualTo(2);
assertThat(config.getString(AivenAclAuthorizerConfig.METRICS_RECORDING_LEVEL_CONFIG))
.isEqualTo(Sensor.RecordingLevel.INFO.name());
}

@Test
Expand All @@ -57,13 +66,23 @@ void correctFullConfig() {
userActivityProps.put("aiven.acl.authorizer.log.denials", "false");
userActivityProps.put("aiven.acl.authorizer.config.refresh.interval", "10");
userActivityProps.put("aiven.acl.authorizer.list.acls.enabled", "false");
userActivityProps.put("aiven.acl.authorizer.metrics.sample.window.ms", "10000");
userActivityProps.put("aiven.acl.authorizer.metrics.num.samples", "10");
userActivityProps.put("aiven.acl.authorizer.metrics.recording.level", "DEBUG");

var config = new AivenAclAuthorizerConfig(userActivityProps);
assertEquals("/test", config.getConfigFile().getAbsolutePath());
assertEquals(UserActivityAuditor.class, config.getAuditor().getClass());
assertFalse(config.logDenials());
assertEquals(10, config.configRefreshInterval());
assertFalse(config.listAclsEnabled());
// metrics
assertThat(config.getLong(AivenAclAuthorizerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG))
.isEqualTo(10_000);
assertThat(config.getInt(AivenAclAuthorizerConfig.METRICS_NUM_SAMPLES_CONFIG))
.isEqualTo(10);
assertThat(config.getString(AivenAclAuthorizerConfig.METRICS_RECORDING_LEVEL_CONFIG))
.isEqualTo(Sensor.RecordingLevel.DEBUG.name());

final Map<String, String> userActivityOpsProps = new HashMap<>();
userActivityOpsProps.put("aiven.acl.authorizer.configuration", "/test");
Expand Down
Loading

0 comments on commit 0e175d1

Please sign in to comment.