From e712494b053fef73c19502dc021650171b3d0c69 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Thu, 24 Mar 2022 12:13:09 +0100 Subject: [PATCH] Set Ccompat url as default registry url when Confluent serde is on the classpath. --- .../avro/ApicurioRegistryAvroProcessor.java | 6 ++ .../DevServicesApicurioRegistryProcessor.java | 18 +++- .../kafka-avro-apicurio2/pom.xml | 4 +- .../it/kafka/avro/AvroKafkaCreator.java | 36 ++++++-- .../src/main/resources/application.properties | 5 -- .../KafkaAndSchemaRegistryTestResource.java | 50 ----------- .../java/io/quarkus/it/kafka/KafkaAvroIT.java | 22 ++++- .../io/quarkus/it/kafka/KafkaAvroTest.java | 86 ++----------------- .../quarkus/it/kafka/KafkaAvroTestBase.java | 83 ++++++++++++++++++ 9 files changed, 162 insertions(+), 148 deletions(-) delete mode 100644 integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java create mode 100644 integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTestBase.java diff --git a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java index 16cb347e8630d4..b4ea4548771ff6 100644 --- a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java +++ b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/ApicurioRegistryAvroProcessor.java @@ -55,6 +55,12 @@ public void apicurioRegistryAvro(BuildProducer reflect "io.apicurio.rest.client.auth.Auth", "io.apicurio.rest.client.auth.BasicAuth", "io.apicurio.rest.client.auth.OidcAuth")); + + // Confluent API compatibility support -- TODO + + reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false, + "io.confluent.kafka.serializers.KafkaAvroSerializer", + "io.confluent.kafka.serializers.KafkaAvroDeserializer")); } @BuildStep diff --git a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java index 7456b684a2eb7f..fa579798bc67b9 100644 --- a/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java +++ b/extensions/apicurio-registry-avro/deployment/src/main/java/io/quarkus/apicurio/registry/avro/DevServicesApicurioRegistryProcessor.java @@ -11,6 +11,8 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.DockerImageName; +import io.quarkus.bootstrap.classloading.ClassPathElement; +import io.quarkus.bootstrap.classloading.QuarkusClassLoader; import io.quarkus.deployment.Feature; import io.quarkus.deployment.IsDockerWorking; import io.quarkus.deployment.IsNormal; @@ -118,7 +120,21 @@ public void run() { } private String getRegistryUrlConfig(String baseUrl) { - return baseUrl + "/apis/registry/v2"; + if (isCcompat()) { + return baseUrl + "/apis/ccompat/v6"; + } else { + return baseUrl + "/apis/registry/v2"; + } + } + + private static boolean isCcompat() { + for (ClassPathElement cpe : QuarkusClassLoader + .getElements("io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.class", false)) { + if (cpe.isRuntime()) { + return true; + } + } + return false; } private void shutdownApicurioRegistry() { diff --git a/integration-tests/kafka-avro-apicurio2/pom.xml b/integration-tests/kafka-avro-apicurio2/pom.xml index c52d919cd098cd..f9221831dab4cc 100644 --- a/integration-tests/kafka-avro-apicurio2/pom.xml +++ b/integration-tests/kafka-avro-apicurio2/pom.xml @@ -22,7 +22,7 @@ --> - 2.1.5.Final + 2.2.0.Final @@ -279,4 +279,4 @@ - + \ No newline at end of file diff --git a/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java index 065092035cb61d..03fe26b5a2e030 100644 --- a/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java +++ b/integration-tests/kafka-avro-apicurio2/src/main/java/io/quarkus/it/kafka/avro/AvroKafkaCreator.java @@ -31,25 +31,45 @@ public class AvroKafkaCreator { @ConfigProperty(name = "kafka.bootstrap.servers") String bootstrap; - @ConfigProperty(name = "schema.url.confluent") - String confluent; - @ConfigProperty(name = "schema.url.apicurio") - String apicurio; + + @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url") + String apicurioRegistryUrl; + + public AvroKafkaCreator() { + } + + public AvroKafkaCreator(String bootstrap, String apicurioRegistryUrl) { + this.bootstrap = bootstrap; + this.apicurioRegistryUrl = apicurioRegistryUrl; + } + + public String getApicurioRegistryUrl() { + return apicurioRegistryUrl; + } + + public String getConfluentSchemaRegistryUrl() { + return apicurioRegistryUrl; + } + + public String getApicurioSchemaRegistryUrl() { + int p = apicurioRegistryUrl.indexOf("/apis/ccompat/v6"); + return apicurioRegistryUrl.substring(0, p) + "/apis/registry/v2"; + } public KafkaConsumer createConfluentConsumer(String groupdIdConfig, String subscribtionName) { - return createConfluentConsumer(bootstrap, confluent, groupdIdConfig, subscribtionName); + return createConfluentConsumer(bootstrap, getConfluentSchemaRegistryUrl(), groupdIdConfig, subscribtionName); } public KafkaProducer createConfluentProducer(String clientId) { - return createConfluentProducer(bootstrap, confluent, clientId); + return createConfluentProducer(bootstrap, getConfluentSchemaRegistryUrl(), clientId); } public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) { - return createApicurioConsumer(bootstrap, apicurio, groupdIdConfig, subscribtionName); + return createApicurioConsumer(bootstrap, getApicurioSchemaRegistryUrl(), groupdIdConfig, subscribtionName); } public KafkaProducer createApicurioProducer(String clientId) { - return createApicurioProducer(bootstrap, apicurio, clientId); + return createApicurioProducer(bootstrap, getApicurioSchemaRegistryUrl(), clientId); } public static KafkaConsumer createConfluentConsumer(String bootstrap, String confluent, diff --git a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties index eda85eda32cf3d..d4907bbe9f11b9 100644 --- a/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties +++ b/integration-tests/kafka-avro-apicurio2/src/main/resources/application.properties @@ -4,8 +4,3 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check quarkus.kafka.health.enabled=true - -# using QuarkusTestResourceLifecycleManager in this test -# Dev Services are tested by the means of kafka-avro-schema-quickstart -quarkus.kafka.devservices.enabled=false -quarkus.apicurio-registry.devservices.enabled=false diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java deleted file mode 100644 index 696167560eddb1..00000000000000 --- a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAndSchemaRegistryTestResource.java +++ /dev/null @@ -1,50 +0,0 @@ -package io.quarkus.it.kafka; - -import java.util.HashMap; -import java.util.Map; - -import org.testcontainers.containers.GenericContainer; - -import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; -import io.strimzi.test.container.StrimziKafkaContainer; - -public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager { - - private static final StrimziKafkaContainer kafka = new StrimziKafkaContainer().withBrokerId(1); - - private static GenericContainer registry; - - public static String getBootstrapServers() { - return kafka.getBootstrapServers(); - } - - public static String getConfluentSchemaRegistryUrl() { - return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/ccompat/v6"; - } - - public static String getApicurioSchemaRegistryUrl() { - return "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"; - } - - @Override - public Map start() { - kafka.start(); - registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.1.5.Final") - .withExposedPorts(8080) - .withEnv("QUARKUS_PROFILE", "prod"); - registry.start(); - Map properties = new HashMap<>(); - properties.put("schema.url.confluent", - "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/ccompat/v6"); - properties.put("schema.url.apicurio", - "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2"); - properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); - return properties; - } - - @Override - public void stop() { - registry.stop(); - kafka.close(); - } -} diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java index 5030447a2e022b..9bf78418b44c79 100644 --- a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroIT.java @@ -1,16 +1,32 @@ package io.quarkus.it.kafka; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; import org.junit.jupiter.api.BeforeAll; import io.apicurio.registry.rest.client.RegistryClientFactory; import io.apicurio.rest.client.VertxHttpClientProvider; -import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.it.kafka.avro.AvroKafkaCreator; import io.quarkus.test.junit.QuarkusIntegrationTest; import io.vertx.core.Vertx; @QuarkusIntegrationTest -@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) -public class KafkaAvroIT extends KafkaAvroTest { +public class KafkaAvroIT extends KafkaAvroTestBase { + + private AvroKafkaCreator creator; + + @Override + AvroKafkaCreator creator() { + if (creator == null) { + Config config = ConfigProvider.getConfig(); + String bootstrap = config.getValue("kafka.bootstrap.servers", String.class); + String apicurioRegistryUrl = config.getValue("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", + String.class); + creator = new AvroKafkaCreator(bootstrap, apicurioRegistryUrl); + } + return creator; + } + @BeforeAll public static void setUp() { // this is for the test JVM, which also uses Kafka client, which in turn also interacts with the registry diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java index 418243f5d778d9..7aaeea67e39aa2 100644 --- a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTest.java @@ -1,90 +1,18 @@ package io.quarkus.it.kafka; -import java.time.Duration; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import javax.inject.Inject; import io.quarkus.it.kafka.avro.AvroKafkaCreator; -import io.quarkus.it.kafka.avro.Pet; -import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; -import io.restassured.RestAssured; @QuarkusTest -@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) -public class KafkaAvroTest { - - private static final String CONFLUENT_PATH = "/avro/confluent"; - private static final String APICURIO_PATH = "/avro/apicurio"; - - @Test - public void testConfluentAvroProducer() { - KafkaConsumer consumer = AvroKafkaCreator.createConfluentConsumer( - KafkaAndSchemaRegistryTestResource.getBootstrapServers(), - KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(), - "test-avro-confluent", - "test-avro-confluent-producer"); - testAvroProducer(consumer, CONFLUENT_PATH); - } +public class KafkaAvroTest extends KafkaAvroTestBase { - @Test - public void testConfluentAvroConsumer() { - KafkaProducer producer = AvroKafkaCreator.createConfluentProducer( - KafkaAndSchemaRegistryTestResource.getBootstrapServers(), - KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(), - "test-avro-confluent-test"); - testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer"); - } - - @Test - public void testApicurioAvroProducer() { - KafkaConsumer consumer = AvroKafkaCreator.createApicurioConsumer( - KafkaAndSchemaRegistryTestResource.getBootstrapServers(), - KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(), - "test-avro-apicurio", - "test-avro-apicurio-producer"); - testAvroProducer(consumer, APICURIO_PATH); - } - - @Test - public void testApicurioAvroConsumer() { - KafkaProducer producer = AvroKafkaCreator.createApicurioProducer( - KafkaAndSchemaRegistryTestResource.getBootstrapServers(), - KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(), - "test-avro-apicurio-test"); - testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer"); - } - - private void testAvroProducer(KafkaConsumer consumer, String path) { - RestAssured.given() - .header("content-type", "application/json") - .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") - .post(path); - ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); - Assertions.assertEquals(records.key(), (Integer) 0); - Pet pet = records.value(); - Assertions.assertEquals("neo", pet.getName()); - Assertions.assertEquals("tricolor", pet.getColor()); - consumer.close(); - } - - private void testAvroConsumer(KafkaProducer producer, String path, String topic) { - producer.send(new ProducerRecord<>(topic, 1, createPet())); - Pet retrieved = RestAssured.when().get(path).as(Pet.class); - Assertions.assertEquals("neo", retrieved.getName()); - Assertions.assertEquals("white", retrieved.getColor()); - producer.close(); - } + @Inject + AvroKafkaCreator creator; - private Pet createPet() { - Pet pet = new Pet(); - pet.setName("neo"); - pet.setColor("white"); - return pet; + @Override + AvroKafkaCreator creator() { + return creator; } } diff --git a/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTestBase.java b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTestBase.java new file mode 100644 index 00000000000000..ad231afeaa3f7c --- /dev/null +++ b/integration-tests/kafka-avro-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaAvroTestBase.java @@ -0,0 +1,83 @@ +package io.quarkus.it.kafka; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.it.kafka.avro.AvroKafkaCreator; +import io.quarkus.it.kafka.avro.Pet; +import io.restassured.RestAssured; + +public abstract class KafkaAvroTestBase { + + static final String CONFLUENT_PATH = "/avro/confluent"; + static final String APICURIO_PATH = "/avro/apicurio"; + + abstract AvroKafkaCreator creator(); + + @Test + public void testCcompat() { + Assertions.assertTrue(creator().getApicurioRegistryUrl().endsWith("/apis/ccompat/v6")); + } + + @Test + public void testConfluentAvroProducer() { + KafkaConsumer consumer = creator().createConfluentConsumer( + "test-avro-confluent", + "test-avro-confluent-producer"); + testAvroProducer(consumer, CONFLUENT_PATH); + } + + @Test + public void testConfluentAvroConsumer() { + KafkaProducer producer = creator().createConfluentProducer("test-avro-confluent-test"); + testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer"); + } + + @Test + public void testApicurioAvroProducer() { + KafkaConsumer consumer = creator().createApicurioConsumer( + "test-avro-apicurio", + "test-avro-apicurio-producer"); + testAvroProducer(consumer, APICURIO_PATH); + } + + @Test + public void testApicurioAvroConsumer() { + KafkaProducer producer = creator().createApicurioProducer("test-avro-apicurio-test"); + testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer"); + } + + protected void testAvroProducer(KafkaConsumer consumer, String path) { + RestAssured.given() + .header("content-type", "application/json") + .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") + .post(path); + ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); + Assertions.assertEquals(records.key(), (Integer) 0); + Pet pet = records.value(); + Assertions.assertEquals("neo", pet.getName()); + Assertions.assertEquals("tricolor", pet.getColor()); + consumer.close(); + } + + protected void testAvroConsumer(KafkaProducer producer, String path, String topic) { + producer.send(new ProducerRecord<>(topic, 1, createPet())); + Pet retrieved = RestAssured.when().get(path).as(Pet.class); + Assertions.assertEquals("neo", retrieved.getName()); + Assertions.assertEquals("white", retrieved.getColor()); + producer.close(); + } + + private Pet createPet() { + Pet pet = new Pet(); + pet.setName("neo"); + pet.setColor("white"); + return pet; + } +}