Skip to content

Commit

Permalink
Extend Kafka integration tests to verify JKS and PEM handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Feb 21, 2024
1 parent d972a37 commit 989c9a1
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 51 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3596,7 +3596,7 @@
<dependency>
<groupId>me.escoffier.certs</groupId>
<artifactId>certificate-generator-junit5</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>
</dependency>

<dependency>
Expand Down
5 changes: 5 additions & 0 deletions integration-tests/kafka-ssl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>me.escoffier.certs</groupId>
<artifactId>certificate-generator-junit5</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkus.it.kafka.ssl;

public enum CertificateFormat {
PKCS12,
JKS,
PEM
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import java.util.Collections;
import java.util.Properties;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.QueryParam;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -26,38 +26,42 @@
@Path("/ssl")
public class SslKafkaEndpoint {

private Consumer<Integer, String> consumer;

@Inject
Config config;

@PostConstruct
public void create() {
consumer = createConsumer();
}

@GET
public String get() {
public String get(@QueryParam("format") CertificateFormat format) {
Consumer<Integer, String> consumer = createConsumer(format);
final ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(60000));
if (records.isEmpty()) {
return null;
}
consumer.close();
return records.iterator().next().value();
}

public KafkaConsumer<Integer, String> createConsumer() {
public KafkaConsumer<Integer, String> createConsumer(CertificateFormat format) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getValue("kafka.bootstrap.servers", String.class));
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
File tsFile = new File(config.getValue("ssl-dir", String.class), "kafka-truststore.p12");

String truststore = switch (format) {
case PKCS12 -> "kafka-truststore.p12";
case JKS -> "kafka-truststore.jks";
case PEM -> "kafka-ca.crt";
};

File tsFile = new File(config.getValue("ssl-dir", String.class), truststore);
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath());
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
if (format != CertificateFormat.PEM) {
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L");
}
props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, format.name());
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public class KafkaSSLTestResource implements QuarkusTestResourceLifecycleManager
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer()
.withBootstrapServers(c -> String.format("SSL://%s:%s", c.getHost(), c.getMappedPort(KAFKA_PORT)))
.withServerProperties(MountableFile.forClasspathResource("server.properties"))
.withCopyFileToContainer(MountableFile.forClasspathResource("kafka-keystore.p12"),
.withCopyFileToContainer(MountableFile.forHostPath("target/certs/kafka-keystore.p12"),
"/opt/kafka/config/kafka-keystore.p12")
.withCopyFileToContainer(MountableFile.forClasspathResource("kafka-truststore.p12"),
.withCopyFileToContainer(MountableFile.forHostPath("target/certs/kafka-truststore.p12"),
"/opt/kafka/config/kafka-truststore.p12");

@Override
Expand All @@ -29,7 +29,7 @@ public Map<String, String> start() {
// Used by the application
Map<String, String> properties = new HashMap<>();
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
properties.put("ssl-dir", new File("src/test/resources").getAbsolutePath());
properties.put("ssl-dir", new File("target/certs").getAbsolutePath());

return properties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,65 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import io.quarkus.it.kafka.ssl.CertificateFormat;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import me.escoffier.certs.Format;
import me.escoffier.certs.junit5.Certificate;
import me.escoffier.certs.junit5.Certificates;

@Certificates(certificates = {
@Certificate(name = "kafka", formats = { Format.PKCS12, Format.JKS,
Format.PEM }, alias = "kafka-test-store", password = "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L")
}, baseDir = "target/certs")
@QuarkusTest
@QuarkusTestResource(KafkaSSLTestResource.class)
public class SslKafkaConsumerTest {

public static Producer<Integer, String> createProducer() {
public static Producer<Integer, String> createProducer(CertificateFormat format) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers"));
props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-ssl-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
File tsFile = new File("src/test/resources/kafka-truststore.p12");

String truststore = switch (format) {
case PKCS12 -> "kafka-truststore.p12";
case JKS -> "kafka-truststore.jks";
case PEM -> "kafka-ca.crt";
};

File tsFile = new File("target/certs/" + truststore);
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath());
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
if (format != CertificateFormat.PEM) {
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L");
}
props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, format.name());
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

return new KafkaProducer<>(props);
}

@Test
public void testReception() {
Producer<Integer, String> consumer = createProducer();
consumer.send(new ProducerRecord<>("test-ssl-consumer", 1, "hi world"));
String string = RestAssured.when().get("/ssl").andReturn().asString();
Assertions.assertEquals("hi world", string);
@ParameterizedTest
@CsvSource({
"PKCS12",
"JKS",
"PEM"
})
public void testReception(String format) {
try (Producer<Integer, String> producer = createProducer(CertificateFormat.valueOf(format))) {
producer.send(new ProducerRecord<>("test-ssl-consumer", 1, "hi world"));
String string = RestAssured
.given().queryParam("format", format)
.when().get("/ssl")
.andReturn().asString();
Assertions.assertEquals("hi world", string);
}
}

}
23 changes: 0 additions & 23 deletions integration-tests/kafka-ssl/src/test/resources/README.md

This file was deleted.

Binary file not shown.
Binary file not shown.

0 comments on commit 989c9a1

Please sign in to comment.