Skip to content

Commit

Permalink
Set Ccompat url as default registry url when Confluent serde is on th…
Browse files Browse the repository at this point in the history
…e classpath.
  • Loading branch information
alesj committed Mar 24, 2022
1 parent f919dcd commit 7a08f3c
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public void apicurioRegistryAvro(BuildProducer<ReflectiveClassBuildItem> reflect
"io.apicurio.rest.client.auth.Auth",
"io.apicurio.rest.client.auth.BasicAuth",
"io.apicurio.rest.client.auth.OidcAuth"));

// Confluent API compatibility support -- TODO

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.confluent.kafka.serializers.KafkaAvroSerializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer"));
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

import io.quarkus.bootstrap.classloading.ClassPathElement;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.IsDockerWorking;
import io.quarkus.deployment.IsNormal;
Expand Down Expand Up @@ -118,7 +120,21 @@ public void run() {
}

private String getRegistryUrlConfig(String baseUrl) {
return baseUrl + "/apis/registry/v2";
if (isCcompat()) {
return baseUrl + "/apis/ccompat/v6";
} else {
return baseUrl + "/apis/registry/v2";
}
}

private static boolean isCcompat() {
for (ClassPathElement cpe : QuarkusClassLoader
.getElements("io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.class", false)) {
if (cpe.isRuntime()) {
return true;
}
}
return false;
}

private void shutdownApicurioRegistry() {
Expand Down
31 changes: 9 additions & 22 deletions integration-tests/kafka-avro-apicurio2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,36 +117,23 @@
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-avro-deployment</artifactId>
<artifactId>quarkus-apicurio-registry-avro-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-integration-test-class-transformer-deployment</artifactId>
<artifactId>quarkus-kafka-client-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client-deployment</artifactId>
<artifactId>quarkus-avro-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
Expand All @@ -159,7 +146,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-deployment</artifactId>
<artifactId>quarkus-integration-test-class-transformer-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
Expand All @@ -172,7 +159,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson-deployment</artifactId>
<artifactId>quarkus-resteasy-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
Expand All @@ -185,7 +172,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb-deployment</artifactId>
<artifactId>quarkus-resteasy-jackson-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
Expand All @@ -198,7 +185,7 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro-deployment</artifactId>
<artifactId>quarkus-resteasy-jsonb-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,37 @@ public class AvroKafkaCreator {

@ConfigProperty(name = "kafka.bootstrap.servers")
String bootstrap;
@ConfigProperty(name = "schema.url.confluent")
String confluent;
@ConfigProperty(name = "schema.url.apicurio")
String apicurio;

@ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url")
String apicurioRegistryUrl;

public String getApicurioRegistryUrl() {
return apicurioRegistryUrl;
}

public String getConfluentSchemaRegistryUrl() {
return apicurioRegistryUrl;
}

public String getApicurioSchemaRegistryUrl() {
int p = apicurioRegistryUrl.indexOf("/apis/ccompat/v6");
return apicurioRegistryUrl.substring(0, p) + "/apis/registry/v2";
}

public KafkaConsumer<Integer, Pet> createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
return createConfluentConsumer(bootstrap, confluent, groupdIdConfig, subscribtionName);
return createConfluentConsumer(bootstrap, getConfluentSchemaRegistryUrl(), groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createConfluentProducer(String clientId) {
return createConfluentProducer(bootstrap, confluent, clientId);
return createConfluentProducer(bootstrap, getConfluentSchemaRegistryUrl(), clientId);
}

public KafkaConsumer<Integer, Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
return createApicurioConsumer(bootstrap, apicurio, groupdIdConfig, subscribtionName);
return createApicurioConsumer(bootstrap, getApicurioSchemaRegistryUrl(), groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) {
return createApicurioProducer(bootstrap, apicurio, clientId);
return createApicurioProducer(bootstrap, getApicurioSchemaRegistryUrl(), clientId);
}

public static KafkaConsumer<Integer, Pet> createConfluentConsumer(String bootstrap, String confluent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,3 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN

# enable health check
quarkus.kafka.health.enabled=true

# using QuarkusTestResourceLifecycleManager in this test
# Dev Services are tested by the means of kafka-avro-schema-quickstart
quarkus.kafka.devservices.enabled=false
quarkus.apicurio-registry.devservices.enabled=false

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@

import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.rest.client.VertxHttpClientProvider;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.vertx.core.Vertx;

@QuarkusIntegrationTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class KafkaAvroIT extends KafkaAvroTest {
@BeforeAll
public static void setUp() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,90 +1,52 @@
package io.quarkus.it.kafka;

import java.time.Duration;
import javax.inject.Inject;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.it.kafka.avro.AvroKafkaCreator;
import io.quarkus.it.kafka.avro.Pet;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;

@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class KafkaAvroTest {
public class KafkaAvroTest extends KafkaAvroTestBase {

private static final String CONFLUENT_PATH = "/avro/confluent";
private static final String APICURIO_PATH = "/avro/apicurio";
@Inject
AvroKafkaCreator creator;

@Test
public void testCcompat() {
Assertions.assertTrue(creator.getApicurioRegistryUrl().endsWith("/apis/ccompat/v6"));
}

@Test
public void testConfluentAvroProducer() {
KafkaConsumer<Integer, Pet> consumer = AvroKafkaCreator.createConfluentConsumer(
KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(),
KafkaConsumer<Integer, Pet> consumer = creator.createConfluentConsumer(
"test-avro-confluent",
"test-avro-confluent-producer");
testAvroProducer(consumer, CONFLUENT_PATH);
}

@Test
public void testConfluentAvroConsumer() {
KafkaProducer<Integer, Pet> producer = AvroKafkaCreator.createConfluentProducer(
KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(),
"test-avro-confluent-test");
KafkaProducer<Integer, Pet> producer = creator.createConfluentProducer("test-avro-confluent-test");
testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer");
}

@Test
public void testApicurioAvroProducer() {
KafkaConsumer<Integer, Pet> consumer = AvroKafkaCreator.createApicurioConsumer(
KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(),
KafkaConsumer<Integer, Pet> consumer = creator.createApicurioConsumer(
"test-avro-apicurio",
"test-avro-apicurio-producer");
testAvroProducer(consumer, APICURIO_PATH);
}

@Test
public void testApicurioAvroConsumer() {
KafkaProducer<Integer, Pet> producer = AvroKafkaCreator.createApicurioProducer(
KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(),
"test-avro-apicurio-test");
KafkaProducer<Integer, Pet> producer = creator.createApicurioProducer("test-avro-apicurio-test");
testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer");
}

private void testAvroProducer(KafkaConsumer<Integer, Pet> consumer, String path) {
RestAssured.given()
.header("content-type", "application/json")
.body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
.post(path);
ConsumerRecord<Integer, Pet> records = consumer.poll(Duration.ofMillis(20000)).iterator().next();
Assertions.assertEquals(records.key(), (Integer) 0);
Pet pet = records.value();
Assertions.assertEquals("neo", pet.getName());
Assertions.assertEquals("tricolor", pet.getColor());
consumer.close();
}

private void testAvroConsumer(KafkaProducer<Integer, Pet> producer, String path, String topic) {
producer.send(new ProducerRecord<>(topic, 1, createPet()));
Pet retrieved = RestAssured.when().get(path).as(Pet.class);
Assertions.assertEquals("neo", retrieved.getName());
Assertions.assertEquals("white", retrieved.getColor());
producer.close();
}

private Pet createPet() {
Pet pet = new Pet();
pet.setName("neo");
pet.setColor("white");
return pet;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.quarkus.it.kafka;

import java.time.Duration;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;

import io.quarkus.it.kafka.avro.Pet;
import io.restassured.RestAssured;

public abstract class KafkaAvroTestBase {

static final String CONFLUENT_PATH = "/avro/confluent";
static final String APICURIO_PATH = "/avro/apicurio";

protected void testAvroProducer(KafkaConsumer<Integer, Pet> consumer, String path) {
RestAssured.given()
.header("content-type", "application/json")
.body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
.post(path);
ConsumerRecord<Integer, Pet> records = consumer.poll(Duration.ofMillis(20000)).iterator().next();
Assertions.assertEquals(records.key(), (Integer) 0);
Pet pet = records.value();
Assertions.assertEquals("neo", pet.getName());
Assertions.assertEquals("tricolor", pet.getColor());
consumer.close();
}

protected void testAvroConsumer(KafkaProducer<Integer, Pet> producer, String path, String topic) {
producer.send(new ProducerRecord<>(topic, 1, createPet()));
Pet retrieved = RestAssured.when().get(path).as(Pet.class);
Assertions.assertEquals("neo", retrieved.getName());
Assertions.assertEquals("white", retrieved.getColor());
producer.close();
}

private Pet createPet() {
Pet pet = new Pet();
pet.setName("neo");
pet.setColor("white");
return pet;
}
}

0 comments on commit 7a08f3c

Please sign in to comment.