From 9f922261a35b2a3e62d14a8f7a0ce8cfe2f16861 Mon Sep 17 00:00:00 2001 From: Phillip Kruger Date: Thu, 25 Jun 2020 13:17:36 +0200 Subject: [PATCH] Apicurio added to Avro test Signed-off-by: Phillip Kruger --- integration-tests/kafka-avro/pom.xml | 213 ++++++++++++++++++ .../src/main/avro/pet.avsc | 0 .../quarkus/it/kafka/avro/AvroEndpoint.java | 71 ++++++ .../it/kafka/avro/AvroKafkaCreator.java | 113 ++++++++++ .../src/main/resources/application.properties | 11 + .../io/quarkus/it/kafka/KafkaAvroTest.java | 79 +++++++ .../quarkus/it/kafka/KafkaTestResource.java | 41 ++++ .../it/kafka/SchemaRegistryTestResource.java | 14 +- integration-tests/kafka/pom.xml | 51 +---- .../quarkus/it/kafka/avro/AvroEndpoint.java | 90 -------- .../src/main/resources/application.properties | 4 - .../io/quarkus/it/kafka/KafkaAvroTest.java | 92 -------- integration-tests/pom.xml | 1 + 13 files changed, 544 insertions(+), 236 deletions(-) create mode 100644 integration-tests/kafka-avro/pom.xml rename integration-tests/{kafka => kafka-avro}/src/main/avro/pet.avsc (100%) create mode 100644 integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java create mode 100644 integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java create mode 100644 integration-tests/kafka-avro/src/main/resources/application.properties create mode 100644 integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java create mode 100644 integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java rename integration-tests/{kafka => kafka-avro}/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java (61%) delete mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java delete mode 100644 integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java diff --git a/integration-tests/kafka-avro/pom.xml b/integration-tests/kafka-avro/pom.xml new file mode 100644 index 0000000000000..07c4215bfb46f --- /dev/null +++ b/integration-tests/kafka-avro/pom.xml @@ -0,0 +1,213 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + ../ + + 4.0.0 + + quarkus-integration-test-kafka-avro + Quarkus - Integration Tests - Kafka Avro + The Apache Kafka Avro integration tests module + + + true + 1.2.2.Final + + + + + io.quarkus + quarkus-integration-test-class-transformer + + + io.quarkus + quarkus-integration-test-shared-library + + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jackson + + + io.quarkus + quarkus-resteasy-jsonb + + + + + io.quarkus + quarkus-kafka-client + + + + + org.apache.avro + avro + 1.9.2 + + + + io.confluent + kafka-avro-serializer + 5.5.0 + + + io.swagger + swagger-core + + + jakarta.xml.bind + jakarta.xml.bind-api + + + jakarta.ws.rs + jakarta.ws.rs-api + + + + + + io.apicurio + apicurio-registry-utils-serde + ${apicurio.version} + + + org.jboss.spec.javax.interceptor + jboss-interceptors-api_1.2_spec + + + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + jakarta.xml.bind + jakarta.xml.bind-api + + + + + io.debezium + debezium-core + test + + + io.debezium + debezium-core + test-jar + test + + + org.apache.kafka + kafka_2.12 + test + + + org.testcontainers + testcontainers + test + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + org.apache.avro + avro-maven-plugin + 1.9.2 + + + generate-sources + + schema + + + src/main/avro + ${project.build.directory}/generated-sources + String + + + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + maven-failsafe-plugin + + true + + + + + maven-surefire-plugin + + true + + + + + + + + test-kafka + + + test-kafka + + + + + + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + false + + + + + + + + diff --git a/integration-tests/kafka/src/main/avro/pet.avsc b/integration-tests/kafka-avro/src/main/avro/pet.avsc similarity index 100% rename from integration-tests/kafka/src/main/avro/pet.avsc rename to integration-tests/kafka-avro/src/main/avro/pet.avsc diff --git a/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java new file mode 100644 index 0000000000000..7b3a6274daba7 --- /dev/null +++ b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java @@ -0,0 +1,71 @@ +package io.quarkus.it.kafka.avro; + +import java.time.Duration; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.vertx.core.json.JsonObject; + +/** + * Endpoint to test the Avro support + */ +@Path("/avro") +public class AvroEndpoint { + + @GET + @Path("/confluent") + @Produces(MediaType.APPLICATION_JSON) + public JsonObject getConfluent() { + return get(AvroKafkaCreator.createConfluentConsumer("test-avro-confluent-consumer", "test-avro-confluent-consumer")); + } + + @POST + @Path("/confluent") + public void sendConfluent(Pet pet) { + KafkaProducer p = AvroKafkaCreator.createConfluentProducer("test-avro-confluent"); + send(p, pet, "test-avro-confluent-producer"); + } + + @GET + @Path("/apicurio") + @Produces(MediaType.APPLICATION_JSON) + public JsonObject getApicurio() { + return get(AvroKafkaCreator.createApicurioConsumer("test-avro-apicurio-consumer", "test-avro-apicurio-consumer")); + } + + @POST + @Path("/apicurio") + public void sendApicurio(Pet pet) { + KafkaProducer p = AvroKafkaCreator.createApicurioProducer("test-avro-apicurio"); + send(p, pet, "test-avro-apicurio-producer"); + } + + private JsonObject get(KafkaConsumer consumer) { + final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000)); + if (records.isEmpty()) { + return null; + } + ConsumerRecord consumerRecord = records.iterator().next(); + Pet p = consumerRecord.value(); + // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema. + JsonObject result = new JsonObject(); + result.put("name", p.getName()); + result.put("color", p.getColor()); + return result; + } + + private void send(KafkaProducer producer, Pet pet, String topic) { + producer.send(new ProducerRecord<>(topic, 0, pet)); + producer.flush(); + } +} diff --git a/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java new file mode 100644 index 0000000000000..a2e5e2e6b82c0 --- /dev/null +++ b/integration-tests/kafka-avro/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java @@ -0,0 +1,113 @@ +package io.quarkus.it.kafka.avro; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; + +import io.apicurio.registry.utils.serde.AbstractKafkaSerDe; +import io.apicurio.registry.utils.serde.AbstractKafkaSerializer; +import io.apicurio.registry.utils.serde.AvroKafkaDeserializer; +import io.apicurio.registry.utils.serde.AvroKafkaSerializer; +import io.apicurio.registry.utils.serde.avro.AvroDatumProvider; +import io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider; +import io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy; +import io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +/** + * Create Avro Kafka Consumers and Producers + */ +public class AvroKafkaCreator { + + public static KafkaConsumer createConfluentConsumer(String groupdIdConfig, String subscribtionName) { + Properties p = getConfluentConsumerProperties(groupdIdConfig); + return createConsumer(p, subscribtionName); + } + + public static KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) { + Properties p = getApicurioConsumerProperties(groupdIdConfig); + return createConsumer(p, subscribtionName); + } + + public static KafkaProducer createConfluentProducer(String clientId) { + Properties p = getConfluentProducerProperties(clientId); + return createProducer(p); + } + + public static KafkaProducer createApicurioProducer(String clientId) { + Properties p = getApicurioProducerProperties(clientId); + return createProducer(p); + } + + private static KafkaConsumer createConsumer(Properties props, String subscribtionName) { + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(subscribtionName)); + return consumer; + } + + private static KafkaProducer createProducer(Properties props) { + return new KafkaProducer<>(props); + } + + private static Properties getConfluentConsumerProperties(String groupdIdConfig) { + Properties props = getGenericConsumerProperties(groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getProperty("schema.url.confluent")); + props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); + return props; + } + + public static Properties getApicurioConsumerProperties(String groupdIdConfig) { + Properties props = getGenericConsumerProperties(groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName()); + props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, System.getProperty("schema.url.apicurio")); + props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, ReflectAvroDatumProvider.class.getName()); + return props; + } + + private static Properties getGenericConsumerProperties(String groupdIdConfig) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + return props; + } + + private static Properties getConfluentProducerProperties(String clientId) { + Properties props = getGenericProducerProperties(clientId); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, System.getProperty("schema.url.confluent")); + return props; + } + + private static Properties getApicurioProducerProperties(String clientId) { + Properties props = getGenericProducerProperties(clientId); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); + props.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, System.getProperty("schema.url.apicurio")); + props.put(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, SimpleTopicIdStrategy.class.getName()); + props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, GetOrCreateIdStrategy.class.getName()); + props.put(AvroDatumProvider.REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM, ReflectAvroDatumProvider.class.getName()); + return props; + } + + private static Properties getGenericProducerProperties(String clientId) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + return props; + } +} diff --git a/integration-tests/kafka-avro/src/main/resources/application.properties b/integration-tests/kafka-avro/src/main/resources/application.properties new file mode 100644 index 0000000000000..7d25b6f205ef0 --- /dev/null +++ b/integration-tests/kafka-avro/src/main/resources/application.properties @@ -0,0 +1,11 @@ +quarkus.log.category.kafka.level=WARN +quarkus.log.category.\"org.apache.kafka\".level=WARN +quarkus.log.category.\"org.apache.zookeeper\".level=WARN + +# enable health check +quarkus.kafka.health.enabled=true +kafka.bootstrap.servers=localhost:19092 + + +# Native - Avro is not supported in native mode. +quarkus.native.additional-build-args=--initialize-at-run-time=io.quarkus.it.kafka.avro.Pet diff --git a/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java new file mode 100644 index 0000000000000..4a69a09246176 --- /dev/null +++ b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java @@ -0,0 +1,79 @@ +package io.quarkus.it.kafka; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.it.kafka.avro.AvroKafkaCreator; +import io.quarkus.it.kafka.avro.Pet; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; + +@QuarkusTest +@QuarkusTestResource(KafkaTestResource.class) +@QuarkusTestResource(SchemaRegistryTestResource.class) +public class KafkaAvroTest { + + private static final String CONFLUENT_PATH = "/avro/confluent"; + private static final String APICURIO_PATH = "/avro/apicurio"; + + @Test + public void testConfluentAvroProducer() { + KafkaConsumer consumer = AvroKafkaCreator.createConfluentConsumer("test-avro-confluent", + "test-avro-confluent-producer"); + testAvroProducer(consumer, CONFLUENT_PATH); + } + + @Test + public void testConfluentAvroConsumer() { + KafkaProducer producer = AvroKafkaCreator.createConfluentProducer("test-avro-confluent"); + testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer"); + } + + @Test + public void testApicurioAvroProducer() { + KafkaConsumer consumer = AvroKafkaCreator.createApicurioConsumer("test-avro-apicurio", + "test-avro-apicurio-producer"); + testAvroProducer(consumer, APICURIO_PATH); + } + + @Test + public void testApicurioAvroConsumer() { + KafkaProducer producer = AvroKafkaCreator.createApicurioProducer("test-avro-apicurio"); + testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer"); + } + + private void testAvroProducer(KafkaConsumer consumer, String path) { + RestAssured.given() + .header("content-type", "application/json") + .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") + .post(path); + ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); + Assertions.assertEquals(records.key(), (Integer) 0); + Pet pet = records.value(); + Assertions.assertEquals("neo", pet.getName()); + Assertions.assertEquals("tricolor", pet.getColor()); + consumer.close(); + } + + private void testAvroConsumer(KafkaProducer producer, String path, String topic) { + producer.send(new ProducerRecord<>(topic, 1, createPet())); + Pet retrieved = RestAssured.when().get(path).as(Pet.class); + Assertions.assertEquals("neo", retrieved.getName()); + Assertions.assertEquals("white", retrieved.getColor()); + producer.close(); + } + + private Pet createPet() { + Pet pet = new Pet(); + pet.setName("neo"); + pet.setColor("white"); + return pet; + } +} diff --git a/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java new file mode 100644 index 0000000000000..b4e819b120574 --- /dev/null +++ b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/KafkaTestResource.java @@ -0,0 +1,41 @@ +package io.quarkus.it.kafka; + +import java.io.File; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +import io.debezium.kafka.KafkaCluster; +import io.debezium.util.Testing; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class KafkaTestResource implements QuarkusTestResourceLifecycleManager { + + private KafkaCluster kafka; + + @Override + public Map start() { + try { + Properties props = new Properties(); + props.setProperty("zookeeper.connection.timeout.ms", "45000"); + File directory = Testing.Files.createTestingDirectory("kafka-data", true); + kafka = new KafkaCluster().withPorts(2182, 19092) + .addBrokers(1) + .usingDirectory(directory) + .deleteDataUponShutdown(true) + .withKafkaConfiguration(props) + .deleteDataPriorToStartup(true) + .startup(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return Collections.emptyMap(); + } + + @Override + public void stop() { + if (kafka != null) { + kafka.shutdown(); + } + } +} diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java similarity index 61% rename from integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java rename to integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java index 2ee5a253f419e..6deba6adb2a84 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java +++ b/integration-tests/kafka-avro/src/test/java/io/quarkus/it/kafka/SchemaRegistryTestResource.java @@ -1,6 +1,6 @@ package io.quarkus.it.kafka; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.testcontainers.containers.GenericContainer; @@ -9,7 +9,7 @@ public class SchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager { - public GenericContainer registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.1.0.Final") + public GenericContainer registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final") .withExposedPorts(8080) .withEnv("QUARKUS_PROFILE", "prod") .withEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:19092") @@ -19,9 +19,13 @@ public class SchemaRegistryTestResource implements QuarkusTestResourceLifecycleM @Override public Map start() { registry.start(); - return Collections - .singletonMap("schema.url", - "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/ccompat"); + Map properties = new HashMap<>(); + properties.put("schema.url.confluent", + "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api/ccompat"); + properties.put("schema.url.apicurio", + "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api"); + + return properties; } @Override diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml index 71dd6d3656cf7..d763251fe31b2 100644 --- a/integration-tests/kafka/pom.xml +++ b/integration-tests/kafka/pom.xml @@ -54,26 +54,6 @@ quarkus-kafka-client - - - org.apache.avro - avro - 1.9.1 - - - - io.confluent - kafka-avro-serializer - 5.4.1 - - - io.swagger - swagger-core - - - - - io.quarkus @@ -84,6 +64,12 @@ io.rest-assured rest-assured test + + + jakarta.xml.bind + jakarta.xml.bind-api + + io.debezium @@ -108,33 +94,8 @@ - - - confluent - https://packages.confluent.io/maven/ - - - - - org.apache.avro - avro-maven-plugin - 1.9.1 - - - generate-sources - - schema - - - src/main/avro - ${project.build.directory}/generated-sources - String - - - - io.quarkus quarkus-maven-plugin diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java deleted file mode 100644 index 46cf94e54840e..0000000000000 --- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/avro/AvroEndpoint.java +++ /dev/null @@ -1,90 +0,0 @@ -package io.quarkus.it.kafka.avro; - -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -import javax.annotation.PostConstruct; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import io.vertx.core.json.JsonObject; - -/** - * Endpoint to test the Avro support - */ -@Path("/avro") -public class AvroEndpoint { - - private KafkaConsumer consumer; - private KafkaProducer producer; - - @PostConstruct - public void init() { - String registry = System.getProperty("schema.url"); - producer = createProducer(registry); - consumer = createConsumer(registry); - } - - @GET - @Produces(MediaType.APPLICATION_JSON) - public JsonObject get() { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000)); - if (records.isEmpty()) { - return null; - } - Pet p = records.iterator().next().value(); - // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema. - JsonObject result = new JsonObject(); - result.put("name", p.getName()); - result.put("color", p.getColor()); - return result; - } - - @POST - public void send(Pet pet) { - producer.send(new ProducerRecord<>("test-avro-producer", 0, pet)); - producer.flush(); - } - - public static KafkaConsumer createConsumer(String registry) { - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-avro-consumer"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry); - props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Collections.singletonList("test-avro-consumer")); - return consumer; - } - - public static KafkaProducer createProducer(String registry) { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-avro"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry); - return new KafkaProducer<>(props); - } -} diff --git a/integration-tests/kafka/src/main/resources/application.properties b/integration-tests/kafka/src/main/resources/application.properties index 7d25b6f205ef0..aaa5ea04c4993 100644 --- a/integration-tests/kafka/src/main/resources/application.properties +++ b/integration-tests/kafka/src/main/resources/application.properties @@ -5,7 +5,3 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check quarkus.kafka.health.enabled=true kafka.bootstrap.servers=localhost:19092 - - -# Native - Avro is not supported in native mode. -quarkus.native.additional-build-args=--initialize-at-run-time=io.quarkus.it.kafka.avro.Pet diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java deleted file mode 100644 index ec501b3298ae6..0000000000000 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package io.quarkus.it.kafka; - -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import io.quarkus.it.kafka.avro.Pet; -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusTest; -import io.restassured.RestAssured; - -@QuarkusTest -@QuarkusTestResource(KafkaTestResource.class) -@QuarkusTestResource(SchemaRegistryTestResource.class) -public class KafkaAvroTest { - - public static KafkaConsumer createConsumer() { - String registry = System.getProperty("schema.url"); - - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-avro"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry); - - // Without you get GenericData.Record instead of `Pet` - props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); - - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Collections.singletonList("test-avro-producer")); - return consumer; - } - - public static KafkaProducer createProducer() { - String registry = System.getProperty("schema.url"); - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-avro"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registry); - return new KafkaProducer<>(props); - } - - @Test - public void testAvroProducer() { - KafkaConsumer consumer = createConsumer(); - RestAssured.given() - .header("content-type", "application/json") - .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") - .post("/avro"); - ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); - Assertions.assertEquals(records.key(), (Integer) 0); - Pet pet = records.value(); - Assertions.assertEquals("neo", pet.getName()); - Assertions.assertEquals("tricolor", pet.getColor()); - consumer.close(); - } - - @Test - public void testAvroConsumer() { - KafkaProducer producer = createProducer(); - Pet pet = new Pet(); - pet.setName("neo"); - pet.setColor("white"); - producer.send(new ProducerRecord<>("test-avro-consumer", 1, pet)); - Pet retrieved = RestAssured.when().get("/avro").as(Pet.class); - Assertions.assertEquals("neo", retrieved.getName()); - Assertions.assertEquals("white", retrieved.getColor()); - producer.close(); - } - -} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 20cf97b51aa22..e1ef39ffdd3ed 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -31,6 +31,7 @@ gradle main kafka + kafka-avro kafka-streams jpa jpa-derby