Skip to content

Commit

Permalink
Set Ccompat url as default registry url when Confluent serde is on th…
Browse files Browse the repository at this point in the history
…e classpath.
  • Loading branch information
alesj committed Mar 27, 2022
1 parent 9870143 commit e712494
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public void apicurioRegistryAvro(BuildProducer<ReflectiveClassBuildItem> reflect
"io.apicurio.rest.client.auth.Auth",
"io.apicurio.rest.client.auth.BasicAuth",
"io.apicurio.rest.client.auth.OidcAuth"));

// Confluent API compatibility support -- TODO

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.confluent.kafka.serializers.KafkaAvroSerializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer"));
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

import io.quarkus.bootstrap.classloading.ClassPathElement;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.IsDockerWorking;
import io.quarkus.deployment.IsNormal;
Expand Down Expand Up @@ -118,7 +120,21 @@ public void run() {
}

private String getRegistryUrlConfig(String baseUrl) {
return baseUrl + "/apis/registry/v2";
if (isCcompat()) {
return baseUrl + "/apis/ccompat/v6";
} else {
return baseUrl + "/apis/registry/v2";
}
}

private static boolean isCcompat() {
for (ClassPathElement cpe : QuarkusClassLoader
.getElements("io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.class", false)) {
if (cpe.isRuntime()) {
return true;
}
}
return false;
}

private void shutdownApicurioRegistry() {
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/kafka-avro-apicurio2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-->

<properties>
<apicurio.version>2.1.5.Final</apicurio.version>
<apicurio.version>2.2.0.Final</apicurio.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -279,4 +279,4 @@
</profile>
</profiles>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,45 @@ public class AvroKafkaCreator {

@ConfigProperty(name = "kafka.bootstrap.servers")
String bootstrap;
@ConfigProperty(name = "schema.url.confluent")
String confluent;
@ConfigProperty(name = "schema.url.apicurio")
String apicurio;

@ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url")
String apicurioRegistryUrl;

public AvroKafkaCreator() {
}

public AvroKafkaCreator(String bootstrap, String apicurioRegistryUrl) {
this.bootstrap = bootstrap;
this.apicurioRegistryUrl = apicurioRegistryUrl;
}

public String getApicurioRegistryUrl() {
return apicurioRegistryUrl;
}

public String getConfluentSchemaRegistryUrl() {
return apicurioRegistryUrl;
}

public String getApicurioSchemaRegistryUrl() {
int p = apicurioRegistryUrl.indexOf("/apis/ccompat/v6");
return apicurioRegistryUrl.substring(0, p) + "/apis/registry/v2";
}

public KafkaConsumer<Integer, Pet> createConfluentConsumer(String groupdIdConfig, String subscribtionName) {
return createConfluentConsumer(bootstrap, confluent, groupdIdConfig, subscribtionName);
return createConfluentConsumer(bootstrap, getConfluentSchemaRegistryUrl(), groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createConfluentProducer(String clientId) {
return createConfluentProducer(bootstrap, confluent, clientId);
return createConfluentProducer(bootstrap, getConfluentSchemaRegistryUrl(), clientId);
}

public KafkaConsumer<Integer, Pet> createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
return createApicurioConsumer(bootstrap, apicurio, groupdIdConfig, subscribtionName);
return createApicurioConsumer(bootstrap, getApicurioSchemaRegistryUrl(), groupdIdConfig, subscribtionName);
}

public KafkaProducer<Integer, Pet> createApicurioProducer(String clientId) {
return createApicurioProducer(bootstrap, apicurio, clientId);
return createApicurioProducer(bootstrap, getApicurioSchemaRegistryUrl(), clientId);
}

public static KafkaConsumer<Integer, Pet> createConfluentConsumer(String bootstrap, String confluent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,3 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN

# enable health check
quarkus.kafka.health.enabled=true

# using QuarkusTestResourceLifecycleManager in this test
# Dev Services are tested by the means of kafka-avro-schema-quickstart
quarkus.kafka.devservices.enabled=false
quarkus.apicurio-registry.devservices.enabled=false

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
package io.quarkus.it.kafka;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.BeforeAll;

import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.rest.client.VertxHttpClientProvider;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.it.kafka.avro.AvroKafkaCreator;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.vertx.core.Vertx;

@QuarkusIntegrationTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class KafkaAvroIT extends KafkaAvroTest {
public class KafkaAvroIT extends KafkaAvroTestBase {

private AvroKafkaCreator creator;

@Override
AvroKafkaCreator creator() {
if (creator == null) {
Config config = ConfigProvider.getConfig();
String bootstrap = config.getValue("kafka.bootstrap.servers", String.class);
String apicurioRegistryUrl = config.getValue("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
String.class);
creator = new AvroKafkaCreator(bootstrap, apicurioRegistryUrl);
}
return creator;
}

@BeforeAll
public static void setUp() {
// this is for the test JVM, which also uses Kafka client, which in turn also interacts with the registry
Expand Down
Original file line number Diff line number Diff line change
@@ -1,90 +1,18 @@
package io.quarkus.it.kafka;

import java.time.Duration;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import javax.inject.Inject;

import io.quarkus.it.kafka.avro.AvroKafkaCreator;
import io.quarkus.it.kafka.avro.Pet;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;

@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class KafkaAvroTest {

private static final String CONFLUENT_PATH = "/avro/confluent";
private static final String APICURIO_PATH = "/avro/apicurio";

@Test
public void testConfluentAvroProducer() {
KafkaConsumer<Integer, Pet> consumer = AvroKafkaCreator.createConfluentConsumer(
KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(),
"test-avro-confluent",
"test-avro-confluent-producer");
testAvroProducer(consumer, CONFLUENT_PATH);
}
public class KafkaAvroTest extends KafkaAvroTestBase {

@Test
public void testConfluentAvroConsumer() {
KafkaProducer<Integer, Pet> producer = AvroKafkaCreator.createConfluentProducer(
KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
KafkaAndSchemaRegistryTestResource.getConfluentSchemaRegistryUrl(),
"test-avro-confluent-test");
testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer");
}

@Test
public void testApicurioAvroProducer() {
KafkaConsumer<Integer, Pet> consumer = AvroKafkaCreator.createApicurioConsumer(
KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(),
"test-avro-apicurio",
"test-avro-apicurio-producer");
testAvroProducer(consumer, APICURIO_PATH);
}

@Test
public void testApicurioAvroConsumer() {
KafkaProducer<Integer, Pet> producer = AvroKafkaCreator.createApicurioProducer(
KafkaAndSchemaRegistryTestResource.getBootstrapServers(),
KafkaAndSchemaRegistryTestResource.getApicurioSchemaRegistryUrl(),
"test-avro-apicurio-test");
testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer");
}

private void testAvroProducer(KafkaConsumer<Integer, Pet> consumer, String path) {
RestAssured.given()
.header("content-type", "application/json")
.body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
.post(path);
ConsumerRecord<Integer, Pet> records = consumer.poll(Duration.ofMillis(20000)).iterator().next();
Assertions.assertEquals(records.key(), (Integer) 0);
Pet pet = records.value();
Assertions.assertEquals("neo", pet.getName());
Assertions.assertEquals("tricolor", pet.getColor());
consumer.close();
}

private void testAvroConsumer(KafkaProducer<Integer, Pet> producer, String path, String topic) {
producer.send(new ProducerRecord<>(topic, 1, createPet()));
Pet retrieved = RestAssured.when().get(path).as(Pet.class);
Assertions.assertEquals("neo", retrieved.getName());
Assertions.assertEquals("white", retrieved.getColor());
producer.close();
}
@Inject
AvroKafkaCreator creator;

private Pet createPet() {
Pet pet = new Pet();
pet.setName("neo");
pet.setColor("white");
return pet;
@Override
AvroKafkaCreator creator() {
return creator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.quarkus.it.kafka;

import java.time.Duration;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.it.kafka.avro.AvroKafkaCreator;
import io.quarkus.it.kafka.avro.Pet;
import io.restassured.RestAssured;

public abstract class KafkaAvroTestBase {

static final String CONFLUENT_PATH = "/avro/confluent";
static final String APICURIO_PATH = "/avro/apicurio";

abstract AvroKafkaCreator creator();

@Test
public void testCcompat() {
Assertions.assertTrue(creator().getApicurioRegistryUrl().endsWith("/apis/ccompat/v6"));
}

@Test
public void testConfluentAvroProducer() {
KafkaConsumer<Integer, Pet> consumer = creator().createConfluentConsumer(
"test-avro-confluent",
"test-avro-confluent-producer");
testAvroProducer(consumer, CONFLUENT_PATH);
}

@Test
public void testConfluentAvroConsumer() {
KafkaProducer<Integer, Pet> producer = creator().createConfluentProducer("test-avro-confluent-test");
testAvroConsumer(producer, CONFLUENT_PATH, "test-avro-confluent-consumer");
}

@Test
public void testApicurioAvroProducer() {
KafkaConsumer<Integer, Pet> consumer = creator().createApicurioConsumer(
"test-avro-apicurio",
"test-avro-apicurio-producer");
testAvroProducer(consumer, APICURIO_PATH);
}

@Test
public void testApicurioAvroConsumer() {
KafkaProducer<Integer, Pet> producer = creator().createApicurioProducer("test-avro-apicurio-test");
testAvroConsumer(producer, APICURIO_PATH, "test-avro-apicurio-consumer");
}

protected void testAvroProducer(KafkaConsumer<Integer, Pet> consumer, String path) {
RestAssured.given()
.header("content-type", "application/json")
.body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
.post(path);
ConsumerRecord<Integer, Pet> records = consumer.poll(Duration.ofMillis(20000)).iterator().next();
Assertions.assertEquals(records.key(), (Integer) 0);
Pet pet = records.value();
Assertions.assertEquals("neo", pet.getName());
Assertions.assertEquals("tricolor", pet.getColor());
consumer.close();
}

protected void testAvroConsumer(KafkaProducer<Integer, Pet> producer, String path, String topic) {
producer.send(new ProducerRecord<>(topic, 1, createPet()));
Pet retrieved = RestAssured.when().get(path).as(Pet.class);
Assertions.assertEquals("neo", retrieved.getName());
Assertions.assertEquals("white", retrieved.getColor());
producer.close();
}

private Pet createPet() {
Pet pet = new Pet();
pet.setName("neo");
pet.setColor("white");
return pet;
}
}

0 comments on commit e712494

Please sign in to comment.