diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index ef58f94691a16..6bdf533c4174f 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -71,6 +71,7 @@ import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.IndexDependencyBuildItem; import io.quarkus.deployment.builditem.LogCategoryBuildItem; +import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; @@ -158,6 +159,20 @@ void contributeClassesToIndex(BuildProducer a indexDependency.produce(new IndexDependencyBuildItem("org.apache.kafka", "kafka-clients")); } + @BuildStep + void relaxSaslElytron(BuildProducer config) { + // If elytron is on the classpath and the Kafka connection uses SASL, the Elytron client SASL implementation + // is stricter than what Kafka expects. In this case, configure the SASL client to relax some constraints. + // See https://github.com/quarkusio/quarkus/issues/20088. + try { + Class.forName("org.wildfly.security.sasl.gssapi.AbstractGssapiMechanism", false, + Thread.currentThread().getContextClassLoader()); + config.produce(new RunTimeConfigurationDefaultBuildItem("kafka.wildfly.sasl.relax-compliance", "true")); + } catch (Exception e) { + // AbstractGssapiMechanism is not on the classpath, do not set wildfly.sasl.relax-compliance + } + } + @BuildStep public void build( KafkaBuildTimeConfig config, diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java index 1bedf2d4dcc6b..2be14e5717251 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java @@ -32,8 +32,10 @@ public Map createKafkaRuntimeConfig(Config config, ApplicationCo if (!propertyNameLowerCase.startsWith(CONFIG_PREFIX)) { continue; } + // Replace _ by . - This is because Kafka properties tend to use . and env variables use _ for every special + // character. So, replace _ with . String effectivePropertyName = propertyNameLowerCase.substring(CONFIG_PREFIX.length() + 1).toLowerCase() - .replaceAll("[^a-z0-9.]", "."); + .replace("_", "."); String value = config.getOptionalValue(propertyName, String.class).orElse(""); result.put(effectivePropertyName, value); } diff --git a/integration-tests/kafka-sasl-elytron/pom.xml b/integration-tests/kafka-sasl-elytron/pom.xml new file mode 100644 index 0000000000000..eb72b93c40400 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/pom.xml @@ -0,0 +1,199 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-kafka-sasl-elytron + Quarkus - Integration Tests - Kafka SASL with Elytron + The Apache Kafka with SASL (Elytron) integration tests module + + + + io.quarkus + quarkus-integration-test-class-transformer + + + io.quarkus + quarkus-integration-test-shared-library + + + + + io.quarkus + quarkus-resteasy-reactive + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + + + io.quarkus + quarkus-smallrye-reactive-messaging-kafka + + + + + io.quarkus + quarkus-infinispan-client + + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + io.strimzi + strimzi-test-container + test + + + org.testcontainers + testcontainers + test + + + org.awaitility + awaitility + test + + + + + io.quarkus + quarkus-integration-test-class-transformer-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-smallrye-reactive-messaging-kafka-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-infinispan-client-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + maven-failsafe-plugin + + true + + + + + maven-surefire-plugin + + true + + + + + + + + test-kafka + + + test-containers + + + + + + maven-surefire-plugin + + false + + target/krb5.conf + + + + + + + + + diff --git a/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaConsumer.java b/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaConsumer.java new file mode 100644 index 0000000000000..28c8ab686c947 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaConsumer.java @@ -0,0 +1,27 @@ +package io.quarkus.it.kafka.sasl; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.jboss.logging.Logger; + +@ApplicationScoped +public class KafkaConsumer { + + private final Logger log = Logger.getLogger(KafkaConsumer.class); + + private final List list = new CopyOnWriteArrayList<>(); + + @Incoming("in") + public void consume(String value) { + log.info(value); + list.add(value); + } + + public List getValues() { + return list; + } +} diff --git a/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaProducer.java b/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaProducer.java new file mode 100644 index 0000000000000..08676d4c4025c --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaProducer.java @@ -0,0 +1,17 @@ +package io.quarkus.it.kafka.sasl; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.mutiny.Multi; + +@ApplicationScoped +public class KafkaProducer { + + @Outgoing("out") + public Multi generatePeople() { + return Multi.createFrom().items("test1", "test2"); + } + +} diff --git a/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaResource.java b/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaResource.java new file mode 100644 index 0000000000000..0b5a93284bf1a --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/main/java/io/quarkus/it/kafka/sasl/KafkaResource.java @@ -0,0 +1,22 @@ +package io.quarkus.it.kafka.sasl; + +import java.util.List; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/kafka") +public class KafkaResource { + + @Inject + KafkaConsumer consumer; + + @GET + @Produces(MediaType.APPLICATION_JSON) + public List getValues() { + return consumer.getValues(); + } +} \ No newline at end of file diff --git a/integration-tests/kafka-sasl-elytron/src/main/resources/application.properties b/integration-tests/kafka-sasl-elytron/src/main/resources/application.properties new file mode 100644 index 0000000000000..8b0689bc04437 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/main/resources/application.properties @@ -0,0 +1,23 @@ +quarkus.log.category.kafka.level=WARN +quarkus.log.category.\"org.apache.kafka\".level=WARN +quarkus.log.category.\"org.apache.zookeeper\".level=WARN + +mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT +mp.messaging.connector.smallrye-kafka.sasl.mechanism=GSSAPI +mp.messaging.connector.smallrye-kafka.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true serviceName="kafka" keyTab="src/test/resources/client.keytab" principal="client/localhost@EXAMPLE.COM"; +mp.messaging.connector.smallrye-kafka.sasl.kerberos.service.name=kafka +mp.messaging.connector.smallrye-kafka.ssl.endpoint.identification.algorithm=https + +mp.messaging.outgoing.out.connector=smallrye-kafka +mp.messaging.outgoing.out.topic=mytopic +mp.messaging.outgoing.out.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.incoming.in.connector=smallrye-kafka +mp.messaging.incoming.in.topic=mytopic +mp.messaging.incoming.in.auto.offset.reset=earliest +mp.messaging.incoming.in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + +# enable health check +quarkus.kafka.health.enabled=true + +# using QuarkusTestResourceLifecycleManager in this test +quarkus.kafka.devservices.enabled=false diff --git a/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslIT.java b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslIT.java new file mode 100644 index 0000000000000..a87833213cbc1 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslIT.java @@ -0,0 +1,26 @@ +package io.quarkus.it.kafka; + +import static io.restassured.RestAssured.get; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.NativeImageTest; +import io.restassured.common.mapper.TypeRef; + +@NativeImageTest +@QuarkusTestResource(KafkaSaslTestResource.class) +public class KafkaSaslIT { + + protected static final TypeRef> TYPE_REF = new TypeRef>() { + }; + + @Test + public void test() { + await().untilAsserted(() -> Assertions.assertEquals(get("/kafka").as(TYPE_REF).size(), 2)); + } +} diff --git a/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslTest.java b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslTest.java new file mode 100644 index 0000000000000..6532f0c2586f9 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslTest.java @@ -0,0 +1,26 @@ +package io.quarkus.it.kafka; + +import static io.restassured.RestAssured.get; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.common.mapper.TypeRef; + +@QuarkusTest +@QuarkusTestResource(KafkaSaslTestResource.class) +public class KafkaSaslTest { + + protected static final TypeRef> TYPE_REF = new TypeRef>() { + }; + + @Test + public void test() { + await().untilAsserted(() -> Assertions.assertEquals(get("/kafka").as(TYPE_REF).size(), 2)); + } +} diff --git a/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslTestResource.java b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslTestResource.java new file mode 100644 index 0000000000000..9009d6e7c32b9 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/KafkaSaslTestResource.java @@ -0,0 +1,55 @@ +package io.quarkus.it.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.jboss.logging.Logger; + +import io.quarkus.it.kafka.containers.KafkaContainer; +import io.quarkus.it.kafka.containers.KerberosContainer; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class KafkaSaslTestResource implements QuarkusTestResourceLifecycleManager { + + private final Logger log = Logger.getLogger(KafkaSaslTestResource.class); + + private KafkaContainer kafka; + private KerberosContainer kerberos; + + @Override + public Map start() { + + Map properties = new HashMap<>(); + + //Start kerberos container + kerberos = new KerberosContainer("gcavalcante8808/krb5-server"); + kerberos.start(); + log.info(kerberos.getLogs()); + kerberos.createTestPrincipals(); + kerberos.createKrb5File(); + properties.put("java.security.krb5.conf", "src/test/resources/krb5.conf"); + + //Start kafka container + kafka = new KafkaContainer(); + kafka.start(); + log.info(kafka.getLogs()); + properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); + + return properties; + } + + @Override + public void stop() { + + if (kafka != null) { + kafka.close(); + kafka.stop(); + } + + if (kerberos != null) { + kerberos.close(); + kerberos.stop(); + } + + } +} diff --git a/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/containers/KafkaContainer.java b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/containers/KafkaContainer.java new file mode 100644 index 0000000000000..1e78d0c25172c --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/containers/KafkaContainer.java @@ -0,0 +1,91 @@ +package io.quarkus.it.kafka.containers; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.jboss.logging.Logger; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.MountableFile; + +import com.github.dockerjava.api.command.InspectContainerResponse; + +import io.strimzi.StrimziKafkaContainer; + +public class KafkaContainer extends FixedHostPortGenericContainer { + + private static final Logger LOGGER = Logger.getLogger(KafkaContainer.class); + + private static final String STARTER_SCRIPT = "/testcontainers_start.sh"; + private static final int KAFKA_PORT = 9092; + private static final String LATEST_KAFKA_VERSION; + + private static final List supportedKafkaVersions = new ArrayList<>(3); + + static { + InputStream inputStream = StrimziKafkaContainer.class.getResourceAsStream("/kafka-versions.txt"); + InputStreamReader streamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + + try (BufferedReader bufferedReader = new BufferedReader(streamReader)) { + String kafkaVersion; + while ((kafkaVersion = bufferedReader.readLine()) != null) { + supportedKafkaVersions.add(kafkaVersion); + } + } catch (IOException e) { + LOGGER.error("Unable to load the supported Kafka versions", e); + } + + // sort kafka version from low to high + Collections.sort(supportedKafkaVersions); + + LATEST_KAFKA_VERSION = supportedKafkaVersions.get(supportedKafkaVersions.size() - 1); + } + + public KafkaContainer() { + super("quay.io/strimzi/kafka:" + "latest-kafka-" + LATEST_KAFKA_VERSION); + + withExposedPorts(KAFKA_PORT); + withFixedExposedPort(KAFKA_PORT, KAFKA_PORT); + withCopyFileToContainer(MountableFile.forClasspathResource("kafkaServer.properties"), + "/opt/kafka/config/server.properties"); + withCopyFileToContainer(MountableFile.forClasspathResource("krb5KafkaBroker.conf"), "/etc/krb5.conf"); + withFileSystemBind("src/test/resources/kafkabroker.keytab", "/opt/kafka/config/kafkabroker.keytab", BindMode.READ_ONLY); + waitingFor(Wait.forLogMessage(".*Kafka startTimeMs:.*", 1)); + withNetwork(Network.SHARED); + withEnv("LOG_DIR", "/tmp"); + } + + @Override + protected void doStart() { + // we need it for the startZookeeper(); and startKafka(); to run container before... + withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT); + super.doStart(); + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarting(containerInfo, reused); + LOGGER.info("Kafka servers :: " + getBootstrapServers()); + String command = "#!/bin/bash \n"; + command += "bin/zookeeper-server-start.sh config/zookeeper.properties &\n"; + command += "bin/kafka-server-start.sh config/server.properties" + + " --override listeners=SASL_PLAINTEXT://:" + KAFKA_PORT + + " --override advertised.listeners=" + getBootstrapServers(); + + copyFileToContainer(Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), STARTER_SCRIPT); + } + + public String getBootstrapServers() { + return String.format("SASL_PLAINTEXT://%s:%s", getHost(), KAFKA_PORT); + } + +} diff --git a/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/containers/KerberosContainer.java b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/containers/KerberosContainer.java new file mode 100644 index 0000000000000..081947b238e6b --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/test/java/io/quarkus/it/kafka/containers/KerberosContainer.java @@ -0,0 +1,64 @@ +package io.quarkus.it.kafka.containers; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStreamReader; +import java.time.Duration; +import java.util.stream.Collectors; + +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; + +public class KerberosContainer extends GenericContainer { + + public KerberosContainer(String dockerImageName) { + super(dockerImageName); + withStartupTimeout(Duration.ofMillis(20000)); + withEnv("KRB5_REALM", "EXAMPLE.COM"); + withEnv("KRB5_KDC", "localhost"); + withEnv("KRB5_PASS", "mypass"); + withExposedPorts(749, 464, 88); + withFileSystemBind("src/test/resources/kafkabroker.keytab", "/tmp/keytab/kafkabroker.keytab", BindMode.READ_WRITE); + withFileSystemBind("src/test/resources/client.keytab", "/tmp/keytab/client.keytab", BindMode.READ_WRITE); + waitingFor(Wait.forLogMessage("Principal \"admin/admin@EXAMPLE.COM\" created.*", 1)); + withNetwork(Network.SHARED); + withNetworkAliases("kerberos"); + } + + public void createTestPrincipals() { + try { + ExecResult lsResult = execInContainer("kadmin.local", "-q", "addprinc -randkey kafka/localhost@EXAMPLE.COM"); + lsResult = execInContainer("kadmin.local", "-q", + "ktadd -norandkey -k /tmp/keytab/kafkabroker.keytab kafka/localhost@EXAMPLE.COM"); + lsResult = execInContainer("kadmin.local", "-q", "addprinc -randkey client/localhost@EXAMPLE.COM"); + lsResult = execInContainer("kadmin.local", "-q", + "ktadd -norandkey -k /tmp/keytab/client.keytab client/localhost@EXAMPLE.COM"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void createKrb5File() { + FileOutputStream file; + try { + FileInputStream fis = new FileInputStream("src/test/resources/krb5ClientTemplate.conf"); + BufferedReader reader = new BufferedReader(new InputStreamReader(fis)); + String content = reader.lines() + .collect(Collectors.joining(System.lineSeparator())); + content = content.replaceAll("", getHost()); + content = content.replaceAll("", getMappedPort(88).toString()); + content = content.replaceAll("", getMappedPort(749).toString()); + file = new FileOutputStream("target/krb5.conf"); + file.write(content.getBytes()); + file.close(); + reader.close(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + +} diff --git a/integration-tests/kafka-sasl-elytron/src/test/resources/client.keytab b/integration-tests/kafka-sasl-elytron/src/test/resources/client.keytab new file mode 100644 index 0000000000000..47ec7040aa38a Binary files /dev/null and b/integration-tests/kafka-sasl-elytron/src/test/resources/client.keytab differ diff --git a/integration-tests/kafka-sasl-elytron/src/test/resources/kafkaServer.properties b/integration-tests/kafka-sasl-elytron/src/test/resources/kafkaServer.properties new file mode 100644 index 0000000000000..c10c56cc57ba2 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/test/resources/kafkaServer.properties @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. It will get the value returned from +# java.net.InetAddress.getCanonicalHostName() if not configured. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +#listeners=PLAINTEXT://:9092 +listeners=SASL_PLAINTEXT://:9092 +#advertised.listeners=SASL_PLAINTEXT://localhost:9092 + + + +# Hostname and port the broker will advertise to producers and consumers. If not set, +# it uses the value for "listeners" if configured. Otherwise, it will use the value +# returned from java.net.InetAddress.getCanonicalHostName(). +#advertised.listeners=PLAINTEXT://your.host.name:9092 +#advertised.listeners=SASL_PLAINTEXT://localhost:9092 + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT + + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +inter.broker.listener.name=SASL_PLAINTEXT + + +#### SASL #### + +sasl.enabled.mechanisms=GSSAPI + +sasl.mechanism.inter.broker.protocol=GSSAPI + +#listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ +# username="broker" \ +# password="broker-secret" \ +# user_broker="broker-secret" \ +# user_client="client-secret"; + +listener.name.sasl_plaintext.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ + useKeyTab=true \ + storeKey=true \ + debug=true \ + serviceName="kafka" \ + keyTab="/opt/kafka/config/kafkabroker.keytab" \ + principal="kafka/localhost@EXAMPLE.COM"; + +sasl.kerberos.service.name=kafka + +#ssl.endpoint.identification.algortigm=https://localhost +ssl.endpoint.identification.algorithm=https +ssl.client.auth=none + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs=/tmp/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=45000 + + +############################# Group Coordinator Settings ############################# + +# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. +# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. +# The default value for this is 3 seconds. +# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. +# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. +group.initial.rebalance.delay.ms=0 \ No newline at end of file diff --git a/integration-tests/kafka-sasl-elytron/src/test/resources/kafkabroker.keytab b/integration-tests/kafka-sasl-elytron/src/test/resources/kafkabroker.keytab new file mode 100644 index 0000000000000..f50cd5c434364 Binary files /dev/null and b/integration-tests/kafka-sasl-elytron/src/test/resources/kafkabroker.keytab differ diff --git a/integration-tests/kafka-sasl-elytron/src/test/resources/krb5ClientTemplate.conf b/integration-tests/kafka-sasl-elytron/src/test/resources/krb5ClientTemplate.conf new file mode 100644 index 0000000000000..4585b45ee5df6 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/test/resources/krb5ClientTemplate.conf @@ -0,0 +1,26 @@ +[logging] + default = FILE:/var/log/kerberos/krb5libs.log + kdc = FILE:/var/log/kerberos/krb5kdc.log + admin_server = FILE:/var/log/kerberos/kadmind.log + +[libdefaults] + default_realm = EXAMPLE.COM + dns_lookup_realm = false + dns_lookup_kdc = false + ticket_lifetime = 24h + renew_lifetime = 7d + forwardable = true + rdns = false + udp_preference_limit = 1 + +[realms] + EXAMPLE.COM = { + kdc = : + admin_server = : + } + +[domain_realm] + .EXAMPLE.COM = EXAMPLE.COM + EXAMPLE.COM = EXAMPLE.COM + .localhost = EXAMPLE.COM + localhost = EXAMPLE.COM \ No newline at end of file diff --git a/integration-tests/kafka-sasl-elytron/src/test/resources/krb5KafkaBroker.conf b/integration-tests/kafka-sasl-elytron/src/test/resources/krb5KafkaBroker.conf new file mode 100644 index 0000000000000..98c91feb3db06 --- /dev/null +++ b/integration-tests/kafka-sasl-elytron/src/test/resources/krb5KafkaBroker.conf @@ -0,0 +1,25 @@ +[logging] + default = FILE:/var/log/kerberos/krb5libs.log + kdc = FILE:/var/log/kerberos/krb5kdc.log + admin_server = FILE:/var/log/kerberos/kadmind.log + +[libdefaults] + default_realm = EXAMPLE.COM + dns_lookup_realm = false + dns_lookup_kdc = false + ticket_lifetime = 24h + renew_lifetime = 7d + forwardable = true + rdns = false + +[realms] + EXAMPLE.COM = { + kdc = kerberos:88 + admin_server = kerberos:749 + } + +[domain_realm] + .EXAMPLE.COM = EXAMPLE.COM + EXAMPLE.COM = EXAMPLE.COM + .localhost = EXAMPLE.COM + localhost = EXAMPLE.COM \ No newline at end of file diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index e7649c044d4e2..848fd3011a835 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -134,6 +134,7 @@ kafka kafka-ssl kafka-sasl + kafka-sasl-elytron kafka-snappy kafka-avro kafka-avro-apicurio2