From 4e8f2586a70e0740d9016acb8245b2df2fd7fb0c Mon Sep 17 00:00:00 2001 From: Ales Justin <ales.justin@gmail.com> Date: Thu, 24 Mar 2022 12:13:09 +0100 Subject: [PATCH] Set Ccompat url as default registry url when Confluent serde is on the classpath. --- .../avro/ApicurioRegistryAvroProcessor.java | 6 ++ .../DevServicesApicurioRegistryProcessor.java | 18 +++++- .../kafka-avro-apicurio2/pom.xml | 4 +- .../it/kafka/avro/AvroKafkaCreator.java | 28 +++++--- .../src/main/resources/application.properties | 5 -- .../KafkaAndSchemaRegistryTestResource.java | 50 --------------- .../java/io/quarkus/it/kafka/KafkaAvroIT.java | 2 - .../io/quarkus/it/kafka/KafkaAvroTest.java | 64 ++++--------------- .../quarkus/it/kafka/KafkaAvroTestBase.java | 46 +++++++++++++ 9 files changed, 104 insertions(+), 119 deletions(-) delete mode 100644 integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java create mode 100644 integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTestBase.java diff --git a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java index 16cb347e8630d4..b4ea4548771ff6 100644 --- a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java +++ b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java @@ -55,6 +55,12 @@ public void apicurioRegistryAvro(BuildProducer<ReflectiveClassBuildItem> reflect "io.apicurio.rest.client.auth.Auth", "io.apicurio.rest.client.auth.BasicAuth", "io.apicurio.rest.client.auth.OidcAuth")); + + // Confluent API compatibility support -- TODO + + reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, + "io.confluent.kafka.serializers.KafkaAvroSerializer", + "io.confluent.kafka.serializers.KafkaAvroDeserializer")); } @BuildStep diff --git a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java index 7456b684a2eb7f..fa579798bc67b9 100644 --- a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java +++ b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java @@ -11,6 +11,8 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.DockerImageName; +import io.quarkus.bootstrap.classloading.ClassPathElement; +import io.quarkus.bootstrap.classloading.QuarkusClassLoader; import io.quarkus.deployment.Feature; import io.quarkus.deployment.IsDockerWorking; import io.quarkus.deployment.IsNormal; @@ -118,7 +120,21 @@ public void run() { } private String getRegistryUrlConfig(String baseUrl) { - return baseUrl + "/apis/registry/v2"; + if (isCcompat()) { + return baseUrl + "/apis/ccompat/v6"; + } else { + return baseUrl + "/apis/registry/v2"; + } + } + + private static boolean isCcompat() { + for (ClassPathElement cpe : QuarkusClassLoader + .getElements("io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.class", false)) { + if (cpe.isRuntime()) { + return true; + } + } + return false; } private void shutdownApicurioRegistry() { diff --git a/integration-tests/kafka-avro-apicurio2/pom.xml b/integration-tests/kafka-avro-apicurio2/pom.xml index c52d919cd098cd..f9221831dab4cc 100644 --- a/integration-tests/kafka-avro-apicurio2/pom.xml +++ b/integration-tests/kafka-avro-apicurio2/pom.xml @@ -22,7 +22,7 @@ --> <properties> - <apicurio.version>2.1.5.Final</apicurio.version> + <apicurio.version>2.2.0.Final</apicurio.version> </properties> <dependencies> @@ -279,4 +279,4 @@ </profile> </profiles> -</project> +</project> \ No newline at end of file diff --git a/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java index 065092035cb61d..cc3c8bd2c601b0 100644 --- a/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java +++ b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java @@ -31,25 +31,37 @@ public class AvroKafkaCreator { @ConfigProperty(name = "kafka.bootstrap.servers") String bootstrap; - @ConfigProperty(name = "schema.url.confluent") - String confluent; - @ConfigProperty(name = "schema.url.apicurio") - String apicurio; + + @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url") + String apicurioRegistryUrl; + + public String getApicurioRegistryUrl() { + return apicurioRegistryUrl; + } + + public String getConfluentSchemaRegistryUrl() { + return apicurioRegistryUrl; + } + + public String getApicurioSchemaRegistryUrl() { + int p = apicurioRegistryUrl.indexOf("/apis/ccompat/v6"); + return apicurioRegistryUrl.substring(0, p) + "/apis/registry/v2"; + } public KafkaConsumer<Integer, Pet> createConfluentConsumer(String groupdIdConfig, String subscribtionName) { - return createConfluentConsumer(bootstrap, confluent, groupdIdConfig, subscribtionName); + return createConfluentConsumer(bootstrap, getConfluentSchemaRegistryUrl(), groupdIdConfig, subscribtionName); } public KafkaProducer<Integer, Pet> createConfluentProducer(String clientId) { - return createConfluentProducer(bootstrap, confluent, clientId); + return createConfluentProducer(bootstrap, getConfluentSchemaRegistryUrl(), clientId); } public KafkaConsumer<Integer, Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) { - return createApicurioConsumer(bootstrap, apicurio, groupdIdConfig, subscribtionName); + return createApicurioConsumer(bootstrap, getApicurioSchemaRegistryUrl(), groupdIdConfig, subscribtionName); } public KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) { - return createApicurioProducer(bootstrap, apicurio, clientId); + return createApicurioProducer(bootstrap, getApicurioSchemaRegistryUrl(), clientId); } public static KafkaConsumer<Integer, Pet> createConfluentConsumer(String bootstrap, String confluent, diff --git a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties index eda85eda32cf3d..d4907bbe9f11b9 100644 --- a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties +++ b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties @@ -4,8 +4,3 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check quarkus.kafka.health.enabled=true - -# using QuarkusTestResourceLifecycleManager in this test -# Dev Services are tested by the means of kafka-avro-schema-quickstart -quarkus.kafka.devservices.enabled=false -quarkus.apicurio-registry.devservices.enabled=false diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java deleted file mode 100644 index 696167560eddb1..00000000000000 --- a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java +++ /dev/null @@ -1,50 +0,0 @@ -package io.quarkus.it.kafka; - -import java.util.HashMap; -import java.util.Map; - -import org.testcontainers.containers.GenericContainer; - -import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; -import io.strimzi.test.container.StrimziKafkaContainer; - -public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager { - - private static final StrimziKafkaContainer kafka = new StrimziKafkaContainer().withBrokerId(1); - - private static GenericContainer<?> registry; - - public static String getBootstrapServers() { - return kafka.getBootstrapServers(); - } - - public static String getConfluentSchemaRegistryUrl() { - return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/ccompat/v6"; - } - - public static String getApicurioSchemaRegistryUrl() { - return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"; - } - - @Override - public Map<String, String> start() { - kafka.start(); - registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.1.5.Final") - .withExposedPorts(8080) - .withEnv("QUARKUS_PROFILE", "prod"); - registry.start(); - Map<String, String> properties = new HashMap<>(); - properties.put("schema.url.confluent", - "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/ccompat/v6"); - properties.put("schema.url.apicurio", - "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"); - properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); - return properties; - } - - @Override - public void stop() { - registry.stop(); - kafka.close(); - } -} diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java index 5030447a2e022b..3d7b31c5fc4316 100644 --- a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java @@ -4,12 +4,10 @@ import io.apicurio.registry.rest.client.RegistryClientFactory; import io.apicurio.rest.client.VertxHttpClientProvider; -import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusIntegrationTest; import io.vertx.core.Vertx; @QuarkusIntegrationTest -@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) public class KafkaAvroIT extends KafkaAvroTest { @BeforeAll public static void setUp() { diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java index 418243f5d778d9..52ae8e25ceff7d 100644 --- a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java @@ -1,32 +1,30 @@ package io.quarkus.it.kafka; -import java.time.Duration; +import javax.inject.Inject; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import io.quarkus.it.kafka.avro.AvroKafkaCreator; import io.quarkus.it.kafka.avro.Pet; -import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; -import io.restassured.RestAssured; @QuarkusTest -@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) -public class KafkaAvroTest { +public class KafkaAvroTest extends KafkaAvroTestBase { - private static final String CONFLUENT_PATH = "/avro/confluent"; - private static final String APICURIO_PATH = "/avro/apicurio"; + @Inject + AvroKafkaCreator creator; + + @Test + public void testCcompat() { + Assertions.assertTrue(creator.getApicurioRegistryUrl().endsWith("/apis/ccompat/v6")); + } @Test public void testConfluentAvroProducer() { - KafkaConsumer<Integer, Pet> consumer = AvroKafkaCreator.createConfluentConsumer( - KafkaAndSchemaRegistryTestResource.getBootstrapServers(), - KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(), + KafkaConsumer<Integer, Pet> consumer = creator.createConfluentConsumer( "test-avro-confluent", "test-avro-confluent-producer"); testAvroProducer(consumer, CONFLUENT_PATH); @@ -34,18 +32,13 @@ public void testConfluentAvroProducer() { @Test public void testConfluentAvroConsumer() { - KafkaProducer<Integer, Pet> producer = AvroKafkaCreator.createConfluentProducer( - KafkaAndSchemaRegistryTestResource.getBootstrapServers(), - KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(), - "test-avro-confluent-test"); + KafkaProducer<Integer, Pet> producer = creator.createConfluentProducer("test-avro-confluent-test"); testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer"); } @Test public void testApicurioAvroProducer() { - KafkaConsumer<Integer, Pet> consumer = AvroKafkaCreator.createApicurioConsumer( - KafkaAndSchemaRegistryTestResource.getBootstrapServers(), - KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(), + KafkaConsumer<Integer, Pet> consumer = creator.createApicurioConsumer( "test-avro-apicurio", "test-avro-apicurio-producer"); testAvroProducer(consumer, APICURIO_PATH); @@ -53,38 +46,7 @@ public void testApicurioAvroProducer() { @Test public void testApicurioAvroConsumer() { - KafkaProducer<Integer, Pet> producer = AvroKafkaCreator.createApicurioProducer( - KafkaAndSchemaRegistryTestResource.getBootstrapServers(), - KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(), - "test-avro-apicurio-test"); + KafkaProducer<Integer, Pet> producer = creator.createApicurioProducer("test-avro-apicurio-test"); testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer"); } - - private void testAvroProducer(KafkaConsumer<Integer, Pet> consumer, String path) { - RestAssured.given() - .header("content-type", "application/json") - .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") - .post(path); - ConsumerRecord<Integer, Pet> records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); - Assertions.assertEquals(records.key(), (Integer) 0); - Pet pet = records.value(); - Assertions.assertEquals("neo", pet.getName()); - Assertions.assertEquals("tricolor", pet.getColor()); - consumer.close(); - } - - private void testAvroConsumer(KafkaProducer<Integer, Pet> producer, String path, String topic) { - producer.send(new ProducerRecord<>(topic, 1, createPet())); - Pet retrieved = RestAssured.when().get(path).as(Pet.class); - Assertions.assertEquals("neo", retrieved.getName()); - Assertions.assertEquals("white", retrieved.getColor()); - producer.close(); - } - - private Pet createPet() { - Pet pet = new Pet(); - pet.setName("neo"); - pet.setColor("white"); - return pet; - } } diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTestBase.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTestBase.java new file mode 100644 index 00000000000000..1b822194e7cfcd --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTestBase.java @@ -0,0 +1,46 @@ +package io.quarkus.it.kafka; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; + +import io.quarkus.it.kafka.avro.Pet; +import io.restassured.RestAssured; + +public abstract class KafkaAvroTestBase { + + static final String CONFLUENT_PATH = "/avro/confluent"; + static final String APICURIO_PATH = "/avro/apicurio"; + + protected void testAvroProducer(KafkaConsumer<Integer, Pet> consumer, String path) { + RestAssured.given() + .header("content-type", "application/json") + .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") + .post(path); + ConsumerRecord<Integer, Pet> records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); + Assertions.assertEquals(records.key(), (Integer) 0); + Pet pet = records.value(); + Assertions.assertEquals("neo", pet.getName()); + Assertions.assertEquals("tricolor", pet.getColor()); + consumer.close(); + } + + protected void testAvroConsumer(KafkaProducer<Integer, Pet> producer, String path, String topic) { + producer.send(new ProducerRecord<>(topic, 1, createPet())); + Pet retrieved = RestAssured.when().get(path).as(Pet.class); + Assertions.assertEquals("neo", retrieved.getName()); + Assertions.assertEquals("white", retrieved.getColor()); + producer.close(); + } + + private Pet createPet() { + Pet pet = new Pet(); + pet.setName("neo"); + pet.setColor("white"); + return pet; + } +}