diff --git a/kafka-streams/src/main/java/io/confluent/developer/TopicAclConfigurer.java b/kafka-streams/src/main/java/io/confluent/developer/TopicAclConfigurer.java new file mode 100644 index 0000000..f43133a --- /dev/null +++ b/kafka-streams/src/main/java/io/confluent/developer/TopicAclConfigurer.java @@ -0,0 +1,49 @@ +package io.confluent.developer; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class TopicAclConfigurer { + static public void configureACLs(Admin adminClient, List topics, String serviceAccountPrincipal) { + topics.forEach( + topic -> { + List aclBindings = Arrays.asList( + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, topic.name(), PatternType.LITERAL), + new AccessControlEntry( + serviceAccountPrincipal, + "*", + AclOperation.READ, + AclPermissionType.ALLOW + ) + ), + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, topic.name(), PatternType.LITERAL), + new AccessControlEntry( + serviceAccountPrincipal, + "*", + AclOperation.WRITE, + AclPermissionType.ALLOW + ) + ) + ); + try { + adminClient.createAcls(aclBindings).all().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + ); + } +} diff --git a/kafka-streams/src/main/java/io/confluent/developer/aggregate/TopicLoader.java b/kafka-streams/src/main/java/io/confluent/developer/aggregate/TopicLoader.java index 811fa85..82afe47 100644 --- a/kafka-streams/src/main/java/io/confluent/developer/aggregate/TopicLoader.java +++ b/kafka-streams/src/main/java/io/confluent/developer/aggregate/TopicLoader.java @@ -1,6 +1,7 @@ package io.confluent.developer.aggregate; import io.confluent.developer.StreamsUtils; +import io.confluent.developer.TopicAclConfigurer; import io.confluent.developer.avro.ElectronicOrder; import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.kafka.clients.admin.Admin; @@ -33,6 +34,8 @@ public static void runProducer() throws IOException { final String outputTopic = properties.getProperty("aggregate.output.topic"); var topics = List.of(StreamsUtils.createTopic(inputTopic), StreamsUtils.createTopic(outputTopic)); adminClient.createTopics(topics); + var serviceAccountName ="User:"+properties.getProperty("service.account.name"); + TopicAclConfigurer.configureACLs(adminClient, topics, serviceAccountName); Callback callback = StreamsUtils.callback();