From 011518c095a9f97aacd217cb7d5eae5832a08607 Mon Sep 17 00:00:00 2001 From: Anatolii Popov Date: Wed, 28 Jul 2021 16:43:18 +0300 Subject: [PATCH 1/2] Refactored AivenAclAuthorizer. Added config.reloading.interval property. --- build.gradle | 1 + checkstyle/checkstyle-suppressions.xml | 1 + .../kafka/auth/AivenAclAuthorizerConfig.java | 17 +- .../kafka/auth/AivenAclAuthorizerV2.java | 227 ++++++++++++++++++ .../io/aiven/kafka/auth/VerdictCache.java | 49 ++++ .../auth/AivenAclAuthorizerConfigTest.java | 4 + .../kafka/auth/AivenAclAuthorizerV2Test.java | 187 +++++++++++++++ 7 files changed, 483 insertions(+), 3 deletions(-) create mode 100644 src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java create mode 100644 src/main/java/io/aiven/kafka/auth/VerdictCache.java create mode 100644 src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java diff --git a/build.gradle b/build.gradle index 14b4a72..2dfe633 100644 --- a/build.gradle +++ b/build.gradle @@ -79,6 +79,7 @@ dependencies { testImplementation "org.openjdk.jmh:jmh-core:1.28" testImplementation "org.openjdk.jmh:jmh-generator-annprocess:1.28" + testImplementation "org.assertj:assertj-core:3.20.2" jmh "org.apache.kafka:kafka_2.12:2.1.1" } diff --git a/checkstyle/checkstyle-suppressions.xml b/checkstyle/checkstyle-suppressions.xml index dad08fc..a81ee98 100644 --- a/checkstyle/checkstyle-suppressions.xml +++ b/checkstyle/checkstyle-suppressions.xml @@ -12,6 +12,7 @@ + diff --git a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java index ab8e77f..a4076d1 100644 --- a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java +++ b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerConfig.java @@ -29,6 +29,7 @@ public final class AivenAclAuthorizerConfig extends AbstractConfig { private static final String CONFIGURATION_CONF = "aiven.acl.authorizer.configuration"; private static final String AUDITOR_CLASS_NAME_CONF = "aiven.acl.authorizer.auditor.class.name"; private static final String LOG_DENIALS_CONF = "aiven.acl.authorizer.log.denials"; + private static final String CONFIG_REFRESH_CONF = "aiven.acl.authorizer.config.refresh.interval"; public AivenAclAuthorizerConfig(final Map originals) { super(configDef(), originals); @@ -54,18 +55,28 @@ public static ConfigDef configDef() { true, ConfigDef.Importance.LOW, "Whether to log denials on INFO level" + ).define( + CONFIG_REFRESH_CONF, + ConfigDef.Type.INT, + 10_000, + ConfigDef.Importance.LOW, + "The interval between ACL reloads" ); } - public final File getConfigFile() { + public File getConfigFile() { return new File(getString(CONFIGURATION_CONF)); } - public final AuditorAPI getAuditor() { + public AuditorAPI getAuditor() { return getConfiguredInstance(AUDITOR_CLASS_NAME_CONF, AuditorAPI.class); } - public final boolean logDenials() { + public boolean logDenials() { return getBoolean(LOG_DENIALS_CONF); } + + public int configRefreshInterval() { + return getInt(CONFIG_REFRESH_CONF); + } } diff --git a/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java new file mode 100644 index 0000000..cec3692 --- /dev/null +++ b/src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java @@ -0,0 +1,227 @@ +/* + * Copyright 2019 Aiven Oy https://aiven.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.auth; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; + +import io.aiven.kafka.auth.audit.AuditorAPI; +import io.aiven.kafka.auth.json.AivenAcl; +import io.aiven.kafka.auth.json.reader.AclJsonReader; +import io.aiven.kafka.auth.json.reader.JsonReaderException; + +import kafka.network.RequestChannel.Session; +import kafka.security.auth.Acl; +import kafka.security.auth.Authorizer; +import kafka.security.auth.Operation; +import kafka.security.auth.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AivenAclAuthorizerV2 implements Authorizer { + + private static final Logger LOGGER = LoggerFactory.getLogger(AivenAclAuthorizerV2.class); + private File configFile; + private AuditorAPI auditor; + private boolean logDenials; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final WatchService watchService; + private final AtomicReference cacheReference = new AtomicReference<>(); + + public AivenAclAuthorizerV2() { + try { + watchService = FileSystems.getDefault().newWatchService(); + } catch (final IOException e) { + LOGGER.error("Failed to initialize WatchService", e); + throw new RuntimeException(e); + } + } + + @Override + public void configure(final java.util.Map configs) { + final AivenAclAuthorizerConfig config = new AivenAclAuthorizerConfig(configs); + + auditor = config.getAuditor(); + logDenials = config.logDenials(); + + configFile = config.getConfigFile(); + final AclJsonReader jsonReader = new AclJsonReader(configFile.toPath()); + cacheReference.set(VerdictCache.create(loadAcls(jsonReader))); + final AtomicReference watchKeyReference = new AtomicReference<>(subscribeToAclChanges(configFile)); + scheduledExecutorService.scheduleWithFixedDelay(() -> { + final WatchKey watchKey = watchKeyReference.get(); + if (watchKey != null) { + final List> watchEvents = watchKey.pollEvents(); + + watchEvents.stream().filter(watchEvent -> { + @SuppressWarnings("unchecked") + final Path path = ((WatchEvent) watchEvent).context(); + return configFile.toPath().getFileName().equals(path); + }).findFirst().ifPresent(watchEvent -> { + LOGGER.info("{}: {}, Modified: {}", + watchEvent.kind(), watchEvent.context(), configFile.lastModified()); + cacheReference.set(VerdictCache.create(loadAcls(jsonReader))); + }); + if (!watchKey.reset()) { + watchKeyReference.compareAndSet(watchKey, subscribeToAclChanges(configFile)); + } + } else { + watchKeyReference.set(subscribeToAclChanges(configFile)); + } + }, 0, config.configRefreshInterval(), TimeUnit.MILLISECONDS); + } + + private WatchKey subscribeToAclChanges(final File configFile) { + try { + return configFile.toPath().toAbsolutePath().getParent() + .register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE); + } catch (final IOException e) { + LOGGER.error("Failed to subscribe to ACL configuration changes", e); + return null; + } + } + + @Override + public void close() { + auditor.stop(); + scheduledExecutorService.shutdownNow(); + try { + watchService.close(); + } catch (final IOException e) { + LOGGER.error("Failed to stop watch service", e); + } + } + + @Override + public boolean authorize(final Session session, + final Operation operation, + final Resource resource) { + final KafkaPrincipal principal = + Objects.requireNonNullElse(session.principal(), KafkaPrincipal.ANONYMOUS); + final String resourceToCheck = + resource.resourceType() + ":" + resource.name(); + final boolean verdict = + checkAcl( + principal.getPrincipalType(), + principal.getName(), + operation.name(), + resourceToCheck + ); + auditor.addActivity(session, operation, resource, verdict); + return verdict; + } + + + /** + * Read ACL entries from config file. + */ + private List loadAcls(final AclJsonReader jsonReader) { + LOGGER.info("Reloading ACL configuration..."); + try { + return jsonReader.read(); + } catch (final JsonReaderException ex) { + LOGGER.error("Failed to load ACL config file", ex); + return Collections.emptyList(); + } + } + + /** + * Authorize a single request. + */ + private boolean checkAcl(final String principalType, + final String principalName, + final String operation, + final String resource) { + + final boolean verdict = cacheReference.get().get(principalType, principalName, operation, resource); + logAuthVerdict(verdict, operation, resource, principalType, principalName); + return verdict; + } + + private void logAuthVerdict(final boolean verdict, + final String operation, + final String resource, + final String principalType, + final String principalName) { + if (verdict) { + LOGGER.debug("[ALLOW] Auth request {} on {} by {} {}", + operation, resource, principalType, principalName); + } else { + if (logDenials) { + LOGGER.info("[DENY] Auth request {} on {} by {} {}", + operation, resource, principalType, principalName); + } else { + LOGGER.debug("[DENY] Auth request {} on {} by {} {}", + operation, resource, principalType, principalName); + } + } + } + + @Override + public scala.collection.immutable.Set getAcls(final Resource resource) { + LOGGER.error("getAcls(Resource) is not implemented"); + return new scala.collection.immutable.HashSet<>(); + } + + @Override + public scala.collection.immutable.Map> getAcls( + final KafkaPrincipal principal) { + LOGGER.error("getAcls(KafkaPrincipal) is not implemented"); + return new scala.collection.immutable.HashMap<>(); + } + + @Override + public scala.collection.immutable.Map> getAcls() { + LOGGER.error("getAcls() is not implemented"); + return new scala.collection.immutable.HashMap<>(); + } + + @Override + public boolean removeAcls(final scala.collection.immutable.Set acls, + final Resource resource) { + LOGGER.error("removeAcls(Set, Resource) is not implemented"); + return false; + } + + @Override + public boolean removeAcls(final Resource resource) { + LOGGER.error("removeAcls(Resource) is not implemented"); + return false; + } + + @Override + public void addAcls(final scala.collection.immutable.Set acls, + final Resource resource) { + LOGGER.error("addAcls(Set, Resource) is not implemented"); + } +} diff --git a/src/main/java/io/aiven/kafka/auth/VerdictCache.java b/src/main/java/io/aiven/kafka/auth/VerdictCache.java new file mode 100644 index 0000000..a268773 --- /dev/null +++ b/src/main/java/io/aiven/kafka/auth/VerdictCache.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021 Aiven Oy https://aiven.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.auth; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.aiven.kafka.auth.json.AivenAcl; + +public class VerdictCache { + private final List aclEntries; + private final Map cache = new ConcurrentHashMap<>(); + + private VerdictCache(final List aclEntries) { + this.aclEntries = aclEntries; + } + + public boolean get(final String principalType, + final String principalName, + final String operation, + final String resource) { + if (aclEntries != null) { + final String cacheKey = resource + "|" + operation + "|" + principalName + "|" + principalType; + return cache.computeIfAbsent(cacheKey, key -> aclEntries.stream() + .anyMatch(aclEntry -> aclEntry.check(principalType, principalName, operation, resource))); + } else { + return false; + } + } + + public static VerdictCache create(final List aclEntries) { + return new VerdictCache(aclEntries); + } +} diff --git a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java index b53acd0..3b4f21b 100644 --- a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java +++ b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerConfigTest.java @@ -54,11 +54,13 @@ void correctFullConfig() { userActivityProps.put("aiven.acl.authorizer.auditor.class.name", UserActivityAuditor.class.getName()); userActivityProps.put("aiven.acl.authorizer.auditor.aggregation.period", "123"); userActivityProps.put("aiven.acl.authorizer.log.denials", "false"); + userActivityProps.put("aiven.acl.authorizer.config.refresh.interval", "10"); var config = new AivenAclAuthorizerConfig(userActivityProps); assertEquals("/test", config.getConfigFile().getAbsolutePath()); assertEquals(UserActivityAuditor.class, config.getAuditor().getClass()); assertFalse(config.logDenials()); + assertEquals(10, config.configRefreshInterval()); final Map userActivityOpsProps = new HashMap<>(); userActivityOpsProps.put("aiven.acl.authorizer.configuration", "/test"); @@ -66,11 +68,13 @@ void correctFullConfig() { UserOperationsActivityAuditor.class.getName()); userActivityOpsProps.put("aiven.acl.authorizer.auditor.aggregation.period", "123"); userActivityOpsProps.put("aiven.acl.authorizer.log.denials", "false"); + userActivityOpsProps.put("aiven.acl.authorizer.config.refresh.interval", "10"); config = new AivenAclAuthorizerConfig(userActivityOpsProps); assertEquals("/test", config.getConfigFile().getAbsolutePath()); assertEquals(UserOperationsActivityAuditor.class, config.getAuditor().getClass()); assertFalse(config.logDenials()); + assertEquals(10, config.configRefreshInterval()); } @Test diff --git a/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java new file mode 100644 index 0000000..66064b1 --- /dev/null +++ b/src/test/java/io/aiven/kafka/auth/AivenAclAuthorizerV2Test.java @@ -0,0 +1,187 @@ +/* + * Copyright 2019 Aiven Oy https://aiven.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.auth; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; + +import kafka.network.RequestChannel.Session; +import kafka.security.auth.Operation; +import kafka.security.auth.Resource; +import kafka.security.auth.ResourceType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AivenAclAuthorizerV2Test { + static final Resource TOPIC_RESOURCE = new Resource( + ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.TOPIC), + "Target", + PatternType.LITERAL + ); + static final Resource GROUP_RESOURCE = new Resource( + ResourceType.fromJava(org.apache.kafka.common.resource.ResourceType.GROUP), + "Target", + PatternType.LITERAL + ); + static final Operation READ_OPERATION = Operation.fromJava(AclOperation.READ); + static final Operation CREATE_OPERATION = Operation.fromJava(AclOperation.CREATE); + static final String ACL_JSON = + "[{\"principal_type\":\"User\",\"principal\":\"^pass$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}]"; + static final String ACL_JSON_NOTYPE = + "[{\"principal\":\"^pass$\",\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}]"; + static final String ACL_JSON_LONG = "[" + + "{\"principal_type\":\"User\",\"principal\":\"^pass-0$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-1$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-2$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-3$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-4$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-5$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-6$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-7$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-8$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-9$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-10$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-11$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal_type\":\"User\",\"principal\":\"^pass-12$\"," + + "\"operation\":\"^Read$\",\"resource\":\"^Topic:(.*)$\"}," + + "{\"principal\":\"^pass-notype$\",\"operation\":\"^Read$\"," + + "\"resource\":\"^Topic:(.*)$\"}" + + "]"; + + @TempDir + Path tmpDir; + Path configFilePath; + final AivenAclAuthorizerV2 auth = new AivenAclAuthorizerV2(); + Map configs; + + @BeforeEach + void setUp() { + configFilePath = tmpDir.resolve("acl.json"); + configs = Map.of( + "aiven.acl.authorizer.configuration", configFilePath.toString(), + "aiven.acl.authorizer.config.refresh.interval", "10"); + auth.configure(configs); + } + + @AfterEach + void tearDown() { + auth.close(); + } + + @Test + public void testAivenAclAuthorizer() throws IOException, InterruptedException { + Files.write(configFilePath, ACL_JSON.getBytes()); + Thread.sleep(100); + + // basic ACL checks + assertThat(auth.authorize(startSessionFor("User", "pass"), READ_OPERATION, TOPIC_RESOURCE)).isTrue(); + assertThat(auth.authorize(startSessionFor("User", "fail"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + assertThat(auth.authorize(startSessionFor("User", "pass"), READ_OPERATION, GROUP_RESOURCE)).isFalse(); + assertThat(auth.authorize(startSessionFor("User", "pass"), CREATE_OPERATION, TOPIC_RESOURCE)).isFalse(); + assertThat(auth.authorize(startSessionFor("NonUser", "pass"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + + // Check support for undefined principal type + Files.write(configFilePath, ACL_JSON_NOTYPE.getBytes()); + Thread.sleep(100); + + assertThat(auth.authorize(startSessionFor("User", "pass"), READ_OPERATION, TOPIC_RESOURCE)).isTrue(); + assertThat(auth.authorize(startSessionFor("NonUser", "pass"), READ_OPERATION, TOPIC_RESOURCE)).isTrue(); + + Files.write(configFilePath, ACL_JSON_LONG.getBytes()); + Thread.sleep(100); + + // first iteration without cache + assertThat(auth.authorize(startSessionFor("User", "pass-1"), READ_OPERATION, TOPIC_RESOURCE)).isTrue(); + assertThat(auth.authorize(startSessionFor("User", "fail-1"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + + // second iteration from cache + assertThat(auth.authorize(startSessionFor("User", "pass-1"), READ_OPERATION, TOPIC_RESOURCE)).isTrue(); + assertThat(auth.authorize(startSessionFor("User", "fail-1"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + + // Checking that wrong configuration leads to failed auth + Files.write(configFilePath, "]".getBytes()); + Thread.sleep(100); + assertThat(auth.authorize(startSessionFor("User", "pass-1"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + assertThat(auth.authorize(startSessionFor("User", "fail-1"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + + // Checking that empty configuration leads to failed auth + Files.write(configFilePath, "".getBytes()); + Thread.sleep(100); + assertThat(auth.authorize(startSessionFor("User", "pass-1"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + assertThat(auth.authorize(startSessionFor("User", "fail-1"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + } + + @Test + public void testConfigReloading() throws IOException, InterruptedException { + auth.configure(configs); + assertThat(auth.authorize(startSessionFor("User", "pass"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + + // check that config is reloaded after file modification + Files.write(configFilePath, ACL_JSON_LONG.getBytes()); + Thread.sleep(100); + + assertThat(auth.authorize(startSessionFor("User", "pass-1"), READ_OPERATION, TOPIC_RESOURCE)).isTrue(); + + // check that config is reloaded after file deletion + assertThat(configFilePath.toFile().delete()).isTrue(); + Thread.sleep(100); + + assertThat(auth.authorize(startSessionFor("User", "pass-1"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + + // check that config is reloaded after directory deletion + assertThat(Files.deleteIfExists(configFilePath.getParent().toAbsolutePath())).isTrue(); + Thread.sleep(100); + + assertThat(auth.authorize(startSessionFor("User", "pass-1"), READ_OPERATION, TOPIC_RESOURCE)).isFalse(); + + // check that config reloaded after file and directory re-creation + assertThat(tmpDir.toFile().mkdir()).isTrue(); + Thread.sleep(100); + Files.write(configFilePath, ACL_JSON.getBytes()); + Thread.sleep(100); + assertThat(auth.authorize(startSessionFor("User", "pass"), READ_OPERATION, TOPIC_RESOURCE)).isTrue(); + } + + private Session startSessionFor(final String principalType, final String name) throws UnknownHostException { + return new Session(new KafkaPrincipal(principalType, name), InetAddress.getLocalHost()); + } +} From 54fd7829109aa10a25ba2a23520e207ada2a3e82 Mon Sep 17 00:00:00 2001 From: Anatolii Popov Date: Tue, 10 Aug 2021 17:22:43 +0300 Subject: [PATCH 2/2] Changed build log level to INFO --- .github/workflows/codeql-analysis.yml | 2 +- .github/workflows/master_push_workflow.yml | 2 +- .github/workflows/pull_request_workflow.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index aad73a5..3d1c8c5 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -68,7 +68,7 @@ jobs: java-version: 11 - name: Build with Gradle - run: ./gradlew build + run: ./gradlew build -i - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/master_push_workflow.yml b/.github/workflows/master_push_workflow.yml index 58b5e0b..78f3d4d 100644 --- a/.github/workflows/master_push_workflow.yml +++ b/.github/workflows/master_push_workflow.yml @@ -17,4 +17,4 @@ jobs: with: java-version: 11 - name: Build with Gradle - run: ./gradlew build + run: ./gradlew build -i diff --git a/.github/workflows/pull_request_workflow.yml b/.github/workflows/pull_request_workflow.yml index 7b4314c..a1dc681 100644 --- a/.github/workflows/pull_request_workflow.yml +++ b/.github/workflows/pull_request_workflow.yml @@ -16,7 +16,7 @@ jobs: with: java-version: 11 - name: Build with Gradle - run: ./gradlew build + run: ./gradlew build -i - name: Publish Test Report uses: mikepenz/action-junit-report@v2 with: