diff --git a/integration-tests/kafka-avro/pom.xml b/integration-tests/kafka-avro/pom.xml
new file mode 100644
index 0000000000000..07c4215bfb46f
--- /dev/null
+++ b/integration-tests/kafka-avro/pom.xml
@@ -0,0 +1,213 @@
+
+
+
+ quarkus-integration-tests-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../
+
+ 4.0.0
+
+ quarkus-integration-test-kafka-avro
+ Quarkus - Integration Tests - Kafka Avro
+ The Apache Kafka Avro integration tests module
+
+
+ true
+ 1.2.2.Final
+
+
+
+
+ io.quarkus
+ quarkus-integration-test-class-transformer
+
+
+ io.quarkus
+ quarkus-integration-test-shared-library
+
+
+
+
+ io.quarkus
+ quarkus-resteasy
+
+
+ io.quarkus
+ quarkus-resteasy-jackson
+
+
+ io.quarkus
+ quarkus-resteasy-jsonb
+
+
+
+
+ io.quarkus
+ quarkus-kafka-client
+
+
+
+
+ org.apache.avro
+ avro
+ 1.9.2
+
+
+
+ io.confluent
+ kafka-avro-serializer
+ 5.5.0
+
+
+ io.swagger
+ swagger-core
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+
+
+
+
+
+ io.apicurio
+ apicurio-registry-utils-serde
+ ${apicurio.version}
+
+
+ org.jboss.spec.javax.interceptor
+ jboss-interceptors-api_1.2_spec
+
+
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+
+
+
+
+ io.debezium
+ debezium-core
+ test
+
+
+ io.debezium
+ debezium-core
+ test-jar
+ test
+
+
+ org.apache.kafka
+ kafka_2.12
+ test
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+
+
+
+ confluent
+ https://packages.confluent.io/maven/
+
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ 1.9.2
+
+
+ generate-sources
+
+ schema
+
+
+ src/main/avro
+ ${project.build.directory}/generated-sources
+ String
+
+
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+
+
+
+ build
+
+
+
+
+
+
+ maven-failsafe-plugin
+
+ true
+
+
+
+
+ maven-surefire-plugin
+
+ true
+
+
+
+
+
+
+
+ test-kafka
+
+
+ test-kafka
+
+
+
+
+
+ maven-surefire-plugin
+
+ false
+
+
+
+ maven-failsafe-plugin
+
+ false
+
+
+
+
+
+
+
+
diff --git a/integration-tests/kafka/src/main/avro/pet.avsc b/integration-tests/kafka-avro/src/main/avro/pet.avsc
similarity index 100%
rename from integration-tests/kafka/src/main/avro/pet.avsc
rename to integration-tests/kafka-avro/src/main/avro/pet.avsc
diff --git a/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java
new file mode 100644
index 0000000000000..7b3a6274daba7
--- /dev/null
+++ b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java
@@ -0,0 +1,71 @@
+package io.quarkus.it.kafka.avro;
+
+import java.time.Duration;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import io.vertx.core.json.JsonObject;
+
+/**
+ * Endpoint to test the Avro support
+ */
+@Path("/avro")
+public class AvroEndpoint {
+
+ @GET
+ @Path("/confluent")
+ @Produces(MediaType.APPLICATION_JSON)
+ public JsonObject getConfluent() {
+ return get(AvroKafkaCreator.createConfluentConsumer("test-avro-confluent-consumer", "test-avro-confluent-consumer"));
+ }
+
+ @POST
+ @Path("/confluent")
+ public void sendConfluent(Pet pet) {
+ KafkaProducer p = AvroKafkaCreator.createConfluentProducer("test-avro-confluent");
+ send(p, pet, "test-avro-confluent-producer");
+ }
+
+ @GET
+ @Path("/apicurio")
+ @Produces(MediaType.APPLICATION_JSON)
+ public JsonObject getApicurio() {
+ return get(AvroKafkaCreator.createApicurioConsumer("test-avro-apicurio-consumer", "test-avro-apicurio-consumer"));
+ }
+
+ @POST
+ @Path("/apicurio")
+ public void sendApicurio(Pet pet) {
+ KafkaProducer p = AvroKafkaCreator.createApicurioProducer("test-avro-apicurio");
+ send(p, pet, "test-avro-apicurio-producer");
+ }
+
+ private JsonObject get(KafkaConsumer consumer) {
+ final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000));
+ if (records.isEmpty()) {
+ return null;
+ }
+ ConsumerRecord consumerRecord = records.iterator().next();
+ Pet p = consumerRecord.value();
+ // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema.
+ JsonObject result = new JsonObject();
+ result.put("name", p.getName());
+ result.put("color", p.getColor());
+ return result;
+ }
+
+ private void send(KafkaProducer producer, Pet pet, String topic) {
+ producer.send(new ProducerRecord<>(topic, 0, pet));
+ producer.flush();
+ }
+}
diff --git a/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java
new file mode 100644
index 0000000000000..a2e5e2e6b82c0
--- /dev/null
+++ b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java
@@ -0,0 +1,113 @@
+package io.quarkus.it.kafka.avro;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+
+import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
+import io.apicurio.registry.utils.serde.AbstractKafkaSerializer;
+import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
+import io.apicurio.registry.utils.serde.AvroKafkaSerializer;
+import io.apicurio.registry.utils.serde.avro.AvroDatumProvider;
+import io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider;
+import io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy;
+import io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+
+/**
+ * Create Avro Kafka Consumers and Producers
+ */
+public class AvroKafkaCreator {
+
+ public static KafkaConsumer createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
+ Properties p = getConfluentConsumerProperties(groupdIdConfig);
+ return createConsumer(p, subscribtionName);
+ }
+
+ public static KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
+ Properties p = getApicurioConsumerProperties(groupdIdConfig);
+ return createConsumer(p, subscribtionName);
+ }
+
+ public static KafkaProducer createConfluentProducer(String clientId) {
+ Properties p = getConfluentProducerProperties(clientId);
+ return createProducer(p);
+ }
+
+ public static KafkaProducer createApicurioProducer(String clientId) {
+ Properties p = getApicurioProducerProperties(clientId);
+ return createProducer(p);
+ }
+
+ private static KafkaConsumer createConsumer(Properties props, String subscribtionName) {
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList(subscribtionName));
+ return consumer;
+ }
+
+ private static KafkaProducer createProducer(Properties props) {
+ return new KafkaProducer<>(props);
+ }
+
+ private static Properties getConfluentConsumerProperties(String groupdIdConfig) {
+ Properties props = getGenericConsumerProperties(groupdIdConfig);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getProperty("schema.url.confluent"));
+ props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
+ return props;
+ }
+
+ public static Properties getApicurioConsumerProperties(String groupdIdConfig) {
+ Properties props = getGenericConsumerProperties(groupdIdConfig);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());
+ props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, System.getProperty("schema.url.apicurio"));
+ props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, ReflectAvroDatumProvider.class.getName());
+ return props;
+ }
+
+ private static Properties getGenericConsumerProperties(String groupdIdConfig) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ return props;
+ }
+
+ private static Properties getConfluentProducerProperties(String clientId) {
+ Properties props = getGenericProducerProperties(clientId);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getProperty("schema.url.confluent"));
+ return props;
+ }
+
+ private static Properties getApicurioProducerProperties(String clientId) {
+ Properties props = getGenericProducerProperties(clientId);
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());
+ props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, System.getProperty("schema.url.apicurio"));
+ props.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, SimpleTopicIdStrategy.class.getName());
+ props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, GetOrCreateIdStrategy.class.getName());
+ props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, ReflectAvroDatumProvider.class.getName());
+ return props;
+ }
+
+ private static Properties getGenericProducerProperties(String clientId) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+ return props;
+ }
+}
diff --git a/integration-tests/kafka-avro/src/main/resources/application.properties b/integration-tests/kafka-avro/src/main/resources/application.properties
new file mode 100644
index 0000000000000..7d25b6f205ef0
--- /dev/null
+++ b/integration-tests/kafka-avro/src/main/resources/application.properties
@@ -0,0 +1,11 @@
+quarkus.log.category.kafka.level=WARN
+quarkus.log.category.\"org.apache.kafka\".level=WARN
+quarkus.log.category.\"org.apache.zookeeper\".level=WARN
+
+# enable health check
+quarkus.kafka.health.enabled=true
+kafka.bootstrap.servers=localhost:19092
+
+
+# Native - Avro is not supported in native mode.
+quarkus.native.additional-build-args=--initialize-at-run-time=io.quarkus.it.kafka.avro.Pet
diff --git a/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java
new file mode 100644
index 0000000000000..4a69a09246176
--- /dev/null
+++ b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java
@@ -0,0 +1,79 @@
+package io.quarkus.it.kafka;
+
+import java.time.Duration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.it.kafka.avro.AvroKafkaCreator;
+import io.quarkus.it.kafka.avro.Pet;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.RestAssured;
+
+@QuarkusTest
+@QuarkusTestResource(KafkaTestResource.class)
+@QuarkusTestResource(SchemaRegistryTestResource.class)
+public class KafkaAvroTest {
+
+ private static final String CONFLUENT_PATH = "/avro/confluent";
+ private static final String APICURIO_PATH = "/avro/apicurio";
+
+ @Test
+ public void testConfluentAvroProducer() {
+ KafkaConsumer consumer = AvroKafkaCreator.createConfluentConsumer("test-avro-confluent",
+ "test-avro-confluent-producer");
+ testAvroProducer(consumer, CONFLUENT_PATH);
+ }
+
+ @Test
+ public void testConfluentAvroConsumer() {
+ KafkaProducer producer = AvroKafkaCreator.createConfluentProducer("test-avro-confluent");
+ testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer");
+ }
+
+ @Test
+ public void testApicurioAvroProducer() {
+ KafkaConsumer consumer = AvroKafkaCreator.createApicurioConsumer("test-avro-apicurio",
+ "test-avro-apicurio-producer");
+ testAvroProducer(consumer, APICURIO_PATH);
+ }
+
+ @Test
+ public void testApicurioAvroConsumer() {
+ KafkaProducer producer = AvroKafkaCreator.createApicurioProducer("test-avro-apicurio");
+ testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer");
+ }
+
+ private void testAvroProducer(KafkaConsumer consumer, String path) {
+ RestAssured.given()
+ .header("content-type", "application/json")
+ .body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
+ .post(path);
+ ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next();
+ Assertions.assertEquals(records.key(), (Integer) 0);
+ Pet pet = records.value();
+ Assertions.assertEquals("neo", pet.getName());
+ Assertions.assertEquals("tricolor", pet.getColor());
+ consumer.close();
+ }
+
+ private void testAvroConsumer(KafkaProducer producer, String path, String topic) {
+ producer.send(new ProducerRecord<>(topic, 1, createPet()));
+ Pet retrieved = RestAssured.when().get(path).as(Pet.class);
+ Assertions.assertEquals("neo", retrieved.getName());
+ Assertions.assertEquals("white", retrieved.getColor());
+ producer.close();
+ }
+
+ private Pet createPet() {
+ Pet pet = new Pet();
+ pet.setName("neo");
+ pet.setColor("white");
+ return pet;
+ }
+}
diff --git a/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java
new file mode 100644
index 0000000000000..b4e819b120574
--- /dev/null
+++ b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java
@@ -0,0 +1,41 @@
+package io.quarkus.it.kafka;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import io.debezium.kafka.KafkaCluster;
+import io.debezium.util.Testing;
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+
+public class KafkaTestResource implements QuarkusTestResourceLifecycleManager {
+
+ private KafkaCluster kafka;
+
+ @Override
+ public Map start() {
+ try {
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connection.timeout.ms", "45000");
+ File directory = Testing.Files.createTestingDirectory("kafka-data", true);
+ kafka = new KafkaCluster().withPorts(2182, 19092)
+ .addBrokers(1)
+ .usingDirectory(directory)
+ .deleteDataUponShutdown(true)
+ .withKafkaConfiguration(props)
+ .deleteDataPriorToStartup(true)
+ .startup();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void stop() {
+ if (kafka != null) {
+ kafka.shutdown();
+ }
+ }
+}
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java
similarity index 61%
rename from integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java
rename to integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java
index 2ee5a253f419e..6deba6adb2a84 100644
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java
+++ b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java
@@ -1,6 +1,6 @@
package io.quarkus.it.kafka;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
@@ -9,7 +9,7 @@
public class SchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
- public GenericContainer> registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.1.0.Final")
+ public GenericContainer> registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod")
.withEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:19092")
@@ -19,9 +19,13 @@ public class SchemaRegistryTestResource implements QuarkusTestResourceLifecycleM
@Override
public Map start() {
registry.start();
- return Collections
- .singletonMap("schema.url",
- "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/ccompat");
+ Map properties = new HashMap<>();
+ properties.put("schema.url.confluent",
+ "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api/ccompat");
+ properties.put("schema.url.apicurio",
+ "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api");
+
+ return properties;
}
@Override
diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml
index 71dd6d3656cf7..d763251fe31b2 100644
--- a/integration-tests/kafka/pom.xml
+++ b/integration-tests/kafka/pom.xml
@@ -54,26 +54,6 @@
quarkus-kafka-client
-
-
- org.apache.avro
- avro
- 1.9.1
-
-
-
- io.confluent
- kafka-avro-serializer
- 5.4.1
-
-
- io.swagger
- swagger-core
-
-
-
-
-
io.quarkus
@@ -84,6 +64,12 @@
io.rest-assured
rest-assured
test
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+
+
io.debezium
@@ -108,33 +94,8 @@
-
-
- confluent
- https://packages.confluent.io/maven/
-
-
-
-
- org.apache.avro
- avro-maven-plugin
- 1.9.1
-
-
- generate-sources
-
- schema
-
-
- src/main/avro
- ${project.build.directory}/generated-sources
- String
-
-
-
-
io.quarkus
quarkus-maven-plugin
diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java
deleted file mode 100644
index 46cf94e54840e..0000000000000
--- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package io.quarkus.it.kafka.avro;
-
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Properties;
-
-import javax.annotation.PostConstruct;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-import io.vertx.core.json.JsonObject;
-
-/**
- * Endpoint to test the Avro support
- */
-@Path("/avro")
-public class AvroEndpoint {
-
- private KafkaConsumer consumer;
- private KafkaProducer producer;
-
- @PostConstruct
- public void init() {
- String registry = System.getProperty("schema.url");
- producer = createProducer(registry);
- consumer = createConsumer(registry);
- }
-
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- public JsonObject get() {
- final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000));
- if (records.isEmpty()) {
- return null;
- }
- Pet p = records.iterator().next().value();
- // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema.
- JsonObject result = new JsonObject();
- result.put("name", p.getName());
- result.put("color", p.getColor());
- return result;
- }
-
- @POST
- public void send(Pet pet) {
- producer.send(new ProducerRecord<>("test-avro-producer", 0, pet));
- producer.flush();
- }
-
- public static KafkaConsumer createConsumer(String registry) {
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-avro-consumer");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry);
- props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
- KafkaConsumer consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Collections.singletonList("test-avro-consumer"));
- return consumer;
- }
-
- public static KafkaProducer createProducer(String registry) {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-avro");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
- props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry);
- return new KafkaProducer<>(props);
- }
-}
diff --git a/integration-tests/kafka/src/main/resources/application.properties b/integration-tests/kafka/src/main/resources/application.properties
index 7d25b6f205ef0..aaa5ea04c4993 100644
--- a/integration-tests/kafka/src/main/resources/application.properties
+++ b/integration-tests/kafka/src/main/resources/application.properties
@@ -5,7 +5,3 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN
# enable health check
quarkus.kafka.health.enabled=true
kafka.bootstrap.servers=localhost:19092
-
-
-# Native - Avro is not supported in native mode.
-quarkus.native.additional-build-args=--initialize-at-run-time=io.quarkus.it.kafka.avro.Pet
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java
deleted file mode 100644
index ec501b3298ae6..0000000000000
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package io.quarkus.it.kafka;
-
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Properties;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-import io.quarkus.it.kafka.avro.Pet;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-
-@QuarkusTest
-@QuarkusTestResource(KafkaTestResource.class)
-@QuarkusTestResource(SchemaRegistryTestResource.class)
-public class KafkaAvroTest {
-
- public static KafkaConsumer createConsumer() {
- String registry = System.getProperty("schema.url");
-
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-avro");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry);
-
- // Without you get GenericData.Record instead of `Pet`
- props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
-
- KafkaConsumer consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Collections.singletonList("test-avro-producer"));
- return consumer;
- }
-
- public static KafkaProducer createProducer() {
- String registry = System.getProperty("schema.url");
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-avro");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
- props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry);
- return new KafkaProducer<>(props);
- }
-
- @Test
- public void testAvroProducer() {
- KafkaConsumer consumer = createConsumer();
- RestAssured.given()
- .header("content-type", "application/json")
- .body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
- .post("/avro");
- ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next();
- Assertions.assertEquals(records.key(), (Integer) 0);
- Pet pet = records.value();
- Assertions.assertEquals("neo", pet.getName());
- Assertions.assertEquals("tricolor", pet.getColor());
- consumer.close();
- }
-
- @Test
- public void testAvroConsumer() {
- KafkaProducer producer = createProducer();
- Pet pet = new Pet();
- pet.setName("neo");
- pet.setColor("white");
- producer.send(new ProducerRecord<>("test-avro-consumer", 1, pet));
- Pet retrieved = RestAssured.when().get("/avro").as(Pet.class);
- Assertions.assertEquals("neo", retrieved.getName());
- Assertions.assertEquals("white", retrieved.getColor());
- producer.close();
- }
-
-}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 20cf97b51aa22..e1ef39ffdd3ed 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -31,6 +31,7 @@
gradle
main
kafka
+ kafka-avro
kafka-streams
jpa
jpa-derby