Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
When you use Infinispan and Kafka at the same time and you connect to a Kafka broker with SASL, the SASL client uses Elytron.
However, Elytron SASL implementation is stricter than what Kafka expects.

This commit relaxes Elytron checks for Kafka.
  • Loading branch information
cescoffier committed Sep 27, 2021
1 parent eb3e2ad commit c17f875
Show file tree
Hide file tree
Showing 18 changed files with 792 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
import io.quarkus.deployment.builditem.LogCategoryBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
Expand Down Expand Up @@ -158,6 +159,20 @@ void contributeClassesToIndex(BuildProducer<AdditionalIndexedClassesBuildItem> a
indexDependency.produce(new IndexDependencyBuildItem("org.apache.kafka", "kafka-clients"));
}

@BuildStep
void relaxSaslElytron(BuildProducer<RunTimeConfigurationDefaultBuildItem> config) {
// If elytron is on the classpath and the Kafka connection uses SASL, the Elytron client SASL implementation
// is stricter than what Kafka expects. In this case, configure the SASL client to relax some constraints.
// See https://github.com/quarkusio/quarkus/issues/20088.
try {
Class.forName("org.wildfly.security.sasl.gssapi.AbstractGssapiMechanism", false,
Thread.currentThread().getContextClassLoader());
config.produce(new RunTimeConfigurationDefaultBuildItem("kafka.wildfly.sasl.relax-compliance", "true"));
} catch (Exception e) {
// AbstractGssapiMechanism is not on the classpath, do not set wildfly.sasl.relax-compliance
}
}

@BuildStep
public void build(
KafkaBuildTimeConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public Map<String, Object> createKafkaRuntimeConfig(Config config, ApplicationCo
if (!propertyNameLowerCase.startsWith(CONFIG_PREFIX)) {
continue;
}
// Replace _ by . - This is because Kafka properties tend to use . and env variables use _ for every special
// character. So, replace _ with .
String effectivePropertyName = propertyNameLowerCase.substring(CONFIG_PREFIX.length() + 1).toLowerCase()
.replaceAll("[^a-z0-9.]", ".");
.replace("_", ".");
String value = config.getOptionalValue(propertyName, String.class).orElse("");
result.put(effectivePropertyName, value);
}
Expand Down
199 changes: 199 additions & 0 deletions integration-tests/kafka-sasl-elytron/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>quarkus-integration-tests-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>quarkus-integration-test-kafka-sasl-elytron</artifactId>
<name>Quarkus - Integration Tests - Kafka SASL with Elytron</name>
<description>The Apache Kafka with SASL (Elytron) integration tests module</description>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-integration-test-class-transformer</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-integration-test-shared-library</artifactId>
</dependency>

<!-- JAX-RS -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>

<!-- Kafka -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

<!-- Infinispan brings Elytron -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-infinispan-client</artifactId>
</dependency>


<!-- test dependencies -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-integration-test-class-transformer-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-infinispan-client-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>test-kafka</id>
<activation>
<property>
<name>test-containers</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>false</skip>
<systemProperties>
<java.security.krb5.conf>target/krb5.conf</java.security.krb5.conf>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.quarkus.it.kafka.sasl;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

@ApplicationScoped
public class KafkaConsumer {

private final Logger log = Logger.getLogger(KafkaConsumer.class);

private final List<String> list = new CopyOnWriteArrayList<>();

@Incoming("in")
public void consume(String value) {
log.info(value);
list.add(value);
}

public List<String> getValues() {
return list;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkus.it.kafka.sasl;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class KafkaProducer {

@Outgoing("out")
public Multi<String> generatePeople() {
return Multi.createFrom().items("test1", "test2");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.it.kafka.sasl;

import java.util.List;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/kafka")
public class KafkaResource {

@Inject
KafkaConsumer consumer;

@GET
@Produces(MediaType.APPLICATION_JSON)
public List<String> getValues() {
return consumer.getValues();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
quarkus.log.category.kafka.level=WARN
quarkus.log.category.\"org.apache.kafka\".level=WARN
quarkus.log.category.\"org.apache.zookeeper\".level=WARN

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=GSSAPI
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true serviceName="kafka" keyTab="src/test/resources/client.keytab" principal="client/[email protected]";
mp.messaging.connector.smallrye-kafka.sasl.kerberos.service.name=kafka
mp.messaging.connector.smallrye-kafka.ssl.endpoint.identification.algorithm=https

mp.messaging.outgoing.out.connector=smallrye-kafka
mp.messaging.outgoing.out.topic=mytopic
mp.messaging.outgoing.out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.in.connector=smallrye-kafka
mp.messaging.incoming.in.topic=mytopic
mp.messaging.incoming.in.auto.offset.reset=earliest
mp.messaging.incoming.in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# enable health check
quarkus.kafka.health.enabled=true

# using QuarkusTestResourceLifecycleManager in this test
quarkus.kafka.devservices.enabled=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.quarkus.it.kafka;

import static io.restassured.RestAssured.get;
import static org.awaitility.Awaitility.await;

import java.util.List;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.NativeImageTest;
import io.restassured.common.mapper.TypeRef;

@NativeImageTest
@QuarkusTestResource(KafkaSaslTestResource.class)
public class KafkaSaslIT {

protected static final TypeRef<List<String>> TYPE_REF = new TypeRef<List<String>>() {
};

@Test
public void test() {
await().untilAsserted(() -> Assertions.assertEquals(get("/kafka").as(TYPE_REF).size(), 2));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.quarkus.it.kafka;

import static io.restassured.RestAssured.get;
import static org.awaitility.Awaitility.await;

import java.util.List;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.common.mapper.TypeRef;

@QuarkusTest
@QuarkusTestResource(KafkaSaslTestResource.class)
public class KafkaSaslTest {

protected static final TypeRef<List<String>> TYPE_REF = new TypeRef<List<String>>() {
};

@Test
public void test() {
await().untilAsserted(() -> Assertions.assertEquals(get("/kafka").as(TYPE_REF).size(), 2));
}
}
Loading

0 comments on commit c17f875

Please sign in to comment.