diff --git a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java index 6082274..9c9921c 100644 --- a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java +++ b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java @@ -19,18 +19,37 @@ import java.io.File; import java.util.Map; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.metrics.Sensor; import io.aiven.kafka.auth.audit.AuditorAPI; import io.aiven.kafka.auth.audit.NoAuditor; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.ValidString.in; + public final class AivenAclAuthorizerConfig extends AbstractConfig { - private static final String CONFIGURATION_CONF = "aiven.acl.authorizer.configuration"; - private static final String AUDITOR_CLASS_NAME_CONF = "aiven.acl.authorizer.auditor.class.name"; - private static final String LOG_DENIALS_CONF = "aiven.acl.authorizer.log.denials"; - private static final String CONFIG_REFRESH_CONF = "aiven.acl.authorizer.config.refresh.interval"; - private static final String LIST_ACLS_ENABLED_CONF = "aiven.acl.authorizer.list.acls.enabled"; + public static final String PREFIX = "aiven.acl.authorizer."; + private static final String CONFIGURATION_CONF = PREFIX + "configuration"; + private static final String AUDITOR_CLASS_NAME_CONF = PREFIX + "auditor.class.name"; + private static final String LOG_DENIALS_CONF = PREFIX + "log.denials"; + private static final String CONFIG_REFRESH_CONF = PREFIX + "config.refresh.interval"; + private static final String LIST_ACLS_ENABLED_CONF = PREFIX + "list.acls.enabled"; + + + public static final String METRICS_NUM_SAMPLES_CONFIG = PREFIX + + CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; + private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC; + + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = PREFIX + + CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; + private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC; + + public static final String METRICS_RECORDING_LEVEL_CONFIG = PREFIX + + CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG; + private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC; public AivenAclAuthorizerConfig(final Map originals) { super(configDef(), originals); @@ -44,30 +63,61 @@ public static ConfigDef configDef() { ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "The path to the configuration file" - ).define( + ) + .define( AUDITOR_CLASS_NAME_CONF, ConfigDef.Type.CLASS, NoAuditor.class, ConfigDef.Importance.MEDIUM, "The auditor class fully qualified name" - ).define( + ) + .define( LOG_DENIALS_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, "Whether to log denials on INFO level" - ).define( + ) + .define( CONFIG_REFRESH_CONF, ConfigDef.Type.INT, 10_000, ConfigDef.Importance.LOW, "The interval between ACL reloads" - ).define( + ) + .define( LIST_ACLS_ENABLED_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.LOW, "Whether to allow listing ACLs" + ) + // metric configs + .define( + METRICS_SAMPLE_WINDOW_MS_CONFIG, + ConfigDef.Type.LONG, + 30000, + atLeast(1), + ConfigDef.Importance.LOW, + METRICS_SAMPLE_WINDOW_MS_DOC + ) + .define( + METRICS_NUM_SAMPLES_CONFIG, + ConfigDef.Type.INT, + 2, + atLeast(1), + ConfigDef.Importance.LOW, + METRICS_NUM_SAMPLES_DOC + ) + .define( + METRICS_RECORDING_LEVEL_CONFIG, + ConfigDef.Type.STRING, + Sensor.RecordingLevel.INFO.toString(), + in(Sensor.RecordingLevel.INFO.toString(), + Sensor.RecordingLevel.DEBUG.toString(), + Sensor.RecordingLevel.TRACE.toString()), + ConfigDef.Importance.LOW, + METRICS_RECORDING_LEVEL_DOC ); } diff --git a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerMetrics.java b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerMetrics.java new file mode 100644 index 0000000..071f414 --- /dev/null +++ b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerMetrics.java @@ -0,0 +1,108 @@ +/* + * Copyright 2024 Aiven Oy https://aiven.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.auth; + +import java.util.List; + +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.KafkaMetricsContext; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.authorizer.AuthorizationResult; + +public class AivenAclAuthorizerMetrics { + static final String METRIC_GROUP_NAME = "auth-metrics"; + static final String AUTH_OP_ALLOW = "auth-ops-allow"; + static final String AUTH_OP_ALLOW_TOTAL = AUTH_OP_ALLOW + "-total"; + final MetricNameTemplate authOpAllowTotal = new MetricNameTemplate(AUTH_OP_ALLOW_TOTAL, METRIC_GROUP_NAME, ""); + final MetricNameTemplate authOpAllowTotalByOperation = new MetricNameTemplate( + AUTH_OP_ALLOW_TOTAL, + METRIC_GROUP_NAME, + "", + "operation" + ); + static final String AUTH_OP_DENY = "auth-ops-deny"; + static final String AUTH_OP_DENY_TOTAL = AUTH_OP_DENY + "-total"; + final MetricNameTemplate authOpDenyTotal = new MetricNameTemplate(AUTH_OP_DENY_TOTAL, METRIC_GROUP_NAME, ""); + final MetricNameTemplate authOpDenyTotalByOperationResourcePrincipal = new MetricNameTemplate( + AUTH_OP_DENY_TOTAL, + METRIC_GROUP_NAME, + "", + "operation", + "resource", + "principal" + ); + + final Metrics metrics; + final Sensor authOpAllowSensor; + final Sensor authOpDenySensor; + + public AivenAclAuthorizerMetrics(final Time time, final MetricConfig metricConfig) { + final JmxReporter reporter = new JmxReporter(); + + this.metrics = new Metrics( + metricConfig, + List.of(reporter), + time, + new KafkaMetricsContext("aiven.kafka.auth") + ); + + authOpAllowSensor = metrics.sensor(AUTH_OP_ALLOW, RecordingLevel.INFO); + authOpAllowSensor.add(metrics.metricInstance(authOpAllowTotal), new CumulativeSum()); + authOpDenySensor = metrics.sensor(AUTH_OP_DENY, RecordingLevel.INFO); + authOpDenySensor.add(metrics.metricInstance(authOpDenyTotal), new CumulativeSum()); + } + + public void recordLogAuthResult( + final AuthorizationResult result, + final AclOperation operation, + final ResourcePattern resourcePattern, + final KafkaPrincipal principal + ) { + switch (result) { + case ALLOWED: + authOpAllowSensor.add( + metrics.metricInstance( + authOpAllowTotalByOperation, + "operation", operation.name() + ), + new CumulativeCount()); + authOpAllowSensor.record(); + break; + case DENIED: + authOpDenySensor.add( + metrics.metricInstance( + authOpDenyTotalByOperationResourcePrincipal, + "operation", operation.name(), + "resource", resourcePattern.name(), + "principal", principal.getName()), + new CumulativeCount()); + authOpDenySensor.record(); + break; + default: break; + } + } +} diff --git a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java index 1a9c1ad..bba42ae 100644 --- a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java +++ b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java @@ -41,8 +41,11 @@ import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.authorizer.AclCreateResult; import org.apache.kafka.server.authorizer.AclDeleteResult; import org.apache.kafka.server.authorizer.Action; @@ -63,24 +66,45 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.aiven.kafka.auth.AivenAclAuthorizerConfig.METRICS_NUM_SAMPLES_CONFIG; +import static io.aiven.kafka.auth.AivenAclAuthorizerConfig.METRICS_RECORDING_LEVEL_CONFIG; +import static io.aiven.kafka.auth.AivenAclAuthorizerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG; + public class AivenAclAuthorizerV2 implements Authorizer { private static final Logger LOGGER = LoggerFactory.getLogger(AivenAclAuthorizerV2.class); + private File configFile; private AuditorAPI auditor; private boolean logDenials; private ScheduledExecutorService scheduledExecutorService; + private volatile WatchService watchService; + private final AtomicReference cacheReference = new AtomicReference<>(); + private final Time time; private AivenAclAuthorizerConfig config; + private AivenAclAuthorizerMetrics metrics; public AivenAclAuthorizerV2() { + this(Time.SYSTEM); + } + + // for testing + AivenAclAuthorizerV2(final Time time) { + this.time = time; } @Override public void configure(final java.util.Map configs) { config = new AivenAclAuthorizerConfig(configs); + + final MetricConfig metricConfig = new MetricConfig() + .samples(config.getInt(METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG))); + metrics = new AivenAclAuthorizerMetrics(time, metricConfig); } @Override @@ -171,13 +195,16 @@ public final List authorize(final AuthorizableRequestContex final boolean verdict = cacheReference.get().get(principal, LegacyOperationNameFormatter.format(operation), resourceToCheck); + final var authResult = verdict ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED; + + metrics.recordLogAuthResult(authResult, operation, resourcePattern, principal); logAuthVerdict(verdict, operation, resourcePattern, principal, requestContext, action.logIfAllowed(), action.logIfDenied()); final var session = new Session(principal, requestContext.clientAddress()); auditor.addActivity(session, action.operation(), action.resourcePattern(), verdict); - result.add(verdict ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED); + result.add(authResult); } return result; } diff --git a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java index 60ed6ef..e8936c4 100644 --- a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java +++ b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.metrics.Sensor; import io.aiven.kafka.auth.audit.NoAuditor; import io.aiven.kafka.auth.audit.UserActivityAuditor; @@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -46,6 +48,13 @@ void correctMinimalConfig() { assertEquals(NoAuditor.class, config.getAuditor().getClass()); assertTrue(config.logDenials()); assertTrue(config.listAclsEnabled()); + // metric configs + assertThat(config.getLong(AivenAclAuthorizerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG)) + .isEqualTo(30_000); + assertThat(config.getInt(AivenAclAuthorizerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .isEqualTo(2); + assertThat(config.getString(AivenAclAuthorizerConfig.METRICS_RECORDING_LEVEL_CONFIG)) + .isEqualTo(Sensor.RecordingLevel.INFO.name()); } @Test @@ -57,6 +66,9 @@ void correctFullConfig() { userActivityProps.put("aiven.acl.authorizer.log.denials", "false"); userActivityProps.put("aiven.acl.authorizer.config.refresh.interval", "10"); userActivityProps.put("aiven.acl.authorizer.list.acls.enabled", "false"); + userActivityProps.put("aiven.acl.authorizer.metrics.sample.window.ms", "10000"); + userActivityProps.put("aiven.acl.authorizer.metrics.num.samples", "10"); + userActivityProps.put("aiven.acl.authorizer.metrics.recording.level", "DEBUG"); var config = new AivenAclAuthorizerConfig(userActivityProps); assertEquals("/test", config.getConfigFile().getAbsolutePath()); @@ -64,6 +76,13 @@ void correctFullConfig() { assertFalse(config.logDenials()); assertEquals(10, config.configRefreshInterval()); assertFalse(config.listAclsEnabled()); + // metrics + assertThat(config.getLong(AivenAclAuthorizerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG)) + .isEqualTo(10_000); + assertThat(config.getInt(AivenAclAuthorizerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .isEqualTo(10); + assertThat(config.getString(AivenAclAuthorizerConfig.METRICS_RECORDING_LEVEL_CONFIG)) + .isEqualTo(Sensor.RecordingLevel.DEBUG.name()); final Map userActivityOpsProps = new HashMap<>(); userActivityOpsProps.put("aiven.acl.authorizer.configuration", "/test"); diff --git a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerMetricsTest.java b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerMetricsTest.java new file mode 100644 index 0000000..9ae902a --- /dev/null +++ b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerMetricsTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2024 Aiven Oy https://aiven.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.auth; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import java.lang.management.ManagementFactory; + +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.authorizer.AuthorizationResult; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class AivenAclAuthorizerMetricsTest { + + private static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); + + @Test + void recordMetrics() throws Exception { + final var metrics = new AivenAclAuthorizerMetrics(Time.SYSTEM, new MetricConfig()); + + final var name = "aiven.kafka.auth:type=auth-metrics"; + final var metricMBean = new ObjectName(name); + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-allow-total")) + .isEqualTo(0.0); + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-deny-total")) + .isEqualTo(0.0); + + metrics.recordLogAuthResult( + AuthorizationResult.ALLOWED, + AclOperation.ALTER, + new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL), + new KafkaPrincipal("USER", "u1")); + + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-allow-total")) + .isEqualTo(1.0); + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-deny-total")) + .isEqualTo(0.0); + + { + final var allowedByOpMBean = new ObjectName(name + ",operation=" + AclOperation.ALTER); + assertThat(MBEAN_SERVER.getAttribute(allowedByOpMBean, "auth-ops-allow-total")) + .isEqualTo(1.0); + } + + metrics.recordLogAuthResult( + AuthorizationResult.DENIED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL), + new KafkaPrincipal("USER", "u1")); + + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-allow-total")) + .isEqualTo(1.0); + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-deny-total")) + .isEqualTo(1.0); + + { + final var deniedByOpMBean = new ObjectName(name + ",operation=" + AclOperation.WRITE + + ",resource=t1,principal=u1"); + assertThat(MBEAN_SERVER.getAttribute(deniedByOpMBean, "auth-ops-deny-total")) + .isEqualTo(1.0); + } + + metrics.recordLogAuthResult( + AuthorizationResult.DENIED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL), + new KafkaPrincipal("USER", "u2")); + + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-allow-total")) + .isEqualTo(1.0); + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-deny-total")) + .isEqualTo(2.0); + + { + final var deniedByOpMBean = new ObjectName(name + ",operation=" + AclOperation.WRITE + + ",resource=t2,principal=u2"); + assertThat(MBEAN_SERVER.getAttribute(deniedByOpMBean, "auth-ops-deny-total")) + .isEqualTo(1.0); + } + + for (int i = 0; i < 10; i++) { + metrics.recordLogAuthResult( + AuthorizationResult.ALLOWED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL), + new KafkaPrincipal("USER", "u2")); + } + + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-allow-total")) + .isEqualTo(11.0); + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-deny-total")) + .isEqualTo(2.0); + + { + final var allowedByOpMBean = new ObjectName(name + ",operation=" + AclOperation.WRITE); + assertThat(MBEAN_SERVER.getAttribute(allowedByOpMBean, "auth-ops-allow-total")) + .isEqualTo(10.0); + } + } +} diff --git a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java index 6c05b69..c8caaa9 100644 --- a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java +++ b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java @@ -16,7 +16,11 @@ package io.aiven.kafka.auth; +import javax.management.MBeanServer; +import javax.management.ObjectName; + import java.io.IOException; +import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.nio.file.Files; import java.nio.file.Path; @@ -61,6 +65,7 @@ import static org.mockito.Mockito.when; public class AivenAclAuthorizerV2Test { + static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); static final ResourcePattern TOPIC_RESOURCE = new ResourcePattern( org.apache.kafka.common.resource.ResourceType.TOPIC, "Target", @@ -246,7 +251,7 @@ public void testUndefinedPrincipalType() throws IOException, InterruptedExceptio } @Test - public void testTopicPrefix() throws IOException, InterruptedException { + public void testTopicPrefix() throws Exception { Files.copy(this.getClass().getResourceAsStream("/acls_topic_prefix.json"), configFilePath); startAuthorizer(); @@ -255,10 +260,12 @@ public void testTopicPrefix() throws IOException, InterruptedException { "prefix-topic", PatternType.LITERAL )), true); + + checkMetrics(1.0, 0.0); } @Test - public void testDeny() throws IOException, InterruptedException { + public void testDeny() throws Exception { Files.copy(this.getClass().getResourceAsStream("/acls_deny.json"), configFilePath); startAuthorizer(); @@ -273,6 +280,8 @@ public void testDeny() throws IOException, InterruptedException { "topic-denied", PatternType.LITERAL )), false); + + checkMetrics(1.0, 1.0); } @Test @@ -431,4 +440,13 @@ private void checkSingleAction(final AuthorizableRequestContext requestCtx, final List result = auth.authorize(requestCtx, List.of(action)); assertThat(result).isEqualTo(List.of(allowed ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)); } + + private static void checkMetrics(final double allowed, final double denied) throws Exception { + final var name = "aiven.kafka.auth:type=auth-metrics"; + final var metricMBean = new ObjectName(name); + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-allow-total")) + .isEqualTo(allowed); + assertThat(MBEAN_SERVER.getAttribute(metricMBean, "auth-ops-deny-total")) + .isEqualTo(denied); + } }