diff --git a/.github/native-tests.json b/.github/native-tests.json
index 4308dea0d13c3a..577be470bb8eb4 100644
--- a/.github/native-tests.json
+++ b/.github/native-tests.json
@@ -57,7 +57,7 @@
{
"category": "Messaging1",
"timeout": 115,
- "test-modules": "kafka, kafka-ssl, kafka-sasl, kafka-avro-apicurio2, kafka-snappy, kafka-streams, reactive-messaging-kafka, kafka-oauth-keycloak",
+ "test-modules": "kafka, kafka-ssl, kafka-sasl, kafka-avro-apicurio2, kafka-json-schema-apicurio2, kafka-snappy, kafka-streams, reactive-messaging-kafka, kafka-oauth-keycloak",
"os-name": "ubuntu-latest"
},
{
diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index 92385c4afe3362..c7213e4efc7969 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -207,7 +207,7 @@
2.22.0
1.3.0.Final
1.11.3
- 2.5.3.Final
+ 2.5.7.Final
0.1.18.Final
1.19.3
3.3.4
@@ -1379,6 +1379,16 @@
quarkus-apicurio-registry-avro-deployment
${project.version}
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema
+ ${project.version}
+
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema-deployment
+ ${project.version}
+
io.quarkus
quarkus-confluent-registry-common
@@ -3379,6 +3389,11 @@
apicurio-registry-serdes-avro-serde
${apicurio-registry.version}
+
+ io.apicurio
+ apicurio-registry-serdes-jsonschema-serde
+ ${apicurio-registry.version}
+
io.apicurio
apicurio-common-rest-client-vertx
diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java
index 6afbe631768b1b..6cd1937771799b 100644
--- a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java
+++ b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java
@@ -130,6 +130,7 @@ public interface Capability {
String APICURIO_REGISTRY = QUARKUS_PREFIX + ".apicurio.registry";
String APICURIO_REGISTRY_AVRO = APICURIO_REGISTRY + ".avro";
+ String APICURIO_REGISTRY_JSON_SCHEMA = APICURIO_REGISTRY + ".json";
String CONFLUENT_REGISTRY = QUARKUS_PREFIX + ".confluent.registry";
String CONFLUENT_REGISTRY_AVRO = CONFLUENT_REGISTRY + ".avro";
diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java
index 1f52df61cac15a..3c7fa9726a6641 100644
--- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java
+++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java
@@ -13,6 +13,7 @@ public enum Feature {
AMAZON_LAMBDA,
AZURE_FUNCTIONS,
APICURIO_REGISTRY_AVRO,
+ APICURIO_REGISTRY_JSON_SCHEMA,
AWT,
CACHE,
CDI,
diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml
index 79f1e15561eedb..66b67abc53c138 100644
--- a/devtools/bom-descriptor-json/pom.xml
+++ b/devtools/bom-descriptor-json/pom.xml
@@ -200,6 +200,19 @@
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
io.quarkus
quarkus-arc
diff --git a/docs/pom.xml b/docs/pom.xml
index 274c6effd891e3..07bf78b6b8df0c 100644
--- a/docs/pom.xml
+++ b/docs/pom.xml
@@ -216,6 +216,19 @@
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
io.quarkus
quarkus-arc-deployment
diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java
index 37ea59286d6024..3a574b4424f850 100644
--- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java
+++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java
@@ -356,12 +356,19 @@ private void handleAvro(BuildProducer reflectiveClass,
"java.lang.AutoCloseable"));
}
- // --- Apicurio Registry 2.x ---
+ // --- Apicurio Registry 2.x Avro ---
if (QuarkusClassLoader.isClassPresentAtRuntime("io.apicurio.registry.serde.avro.AvroKafkaDeserializer")
&& !capabilities.isPresent(Capability.APICURIO_REGISTRY_AVRO)) {
throw new RuntimeException(
"Apicurio Registry 2.x Avro classes detected, please use the quarkus-apicurio-registry-avro extension");
}
+
+ // --- Apicurio Registry 2.x Json Schema ---
+ if (QuarkusClassLoader.isClassPresentAtRuntime("io.apicurio.registry.serde.avro.JsonKafkaDeserializer")
+ && !capabilities.isPresent(Capability.APICURIO_REGISTRY_JSON_SCHEMA)) {
+ throw new RuntimeException(
+ "Apicurio Registry 2.x Json classes detected, please use the quarkus-apicurio-registry-json extension");
+ }
}
@BuildStep
diff --git a/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml b/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml
new file mode 100644
index 00000000000000..0fb9a83544704a
--- /dev/null
+++ b/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml
@@ -0,0 +1,45 @@
+
+
+ 4.0.0
+
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema-parent
+ 999-SNAPSHOT
+
+
+ quarkus-apicurio-registry-json-schema-deployment
+ Quarkus - Apicurio Registry - Json Schema - Deployment
+
+
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema
+
+
+
+ io.quarkus
+ quarkus-apicurio-registry-common-deployment
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
+
diff --git a/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java b/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java
new file mode 100644
index 00000000000000..83c6f0886bba91
--- /dev/null
+++ b/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java
@@ -0,0 +1,51 @@
+package io.quarkus.apicurio.registry.jsonschema;
+
+import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
+import io.quarkus.deployment.Feature;
+import io.quarkus.deployment.annotations.BuildProducer;
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
+import io.quarkus.deployment.builditem.FeatureBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
+
+public class ApicurioRegistryJsonSchemaProcessor {
+ @BuildStep
+ FeatureBuildItem feature() {
+ return new FeatureBuildItem(Feature.APICURIO_REGISTRY_JSON_SCHEMA);
+ }
+
+ @BuildStep
+ public void apicurioRegistryJsonSchema(BuildProducer reflectiveClass,
+ BuildProducer sslNativeSupport) {
+
+ reflectiveClass
+ .produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer",
+ "io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer").methods().build());
+
+ reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy",
+ "io.apicurio.registry.serde.strategy.TopicIdStrategy",
+ "io.apicurio.registry.serde.strategy.QualifiedRecordIdStrategy",
+ "io.apicurio.registry.serde.strategy.RecordIdStrategy",
+ "io.apicurio.registry.serde.jsonschema.strategy.TopicRecordIdStrategy").methods().fields()
+ .build());
+
+ reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.DefaultIdHandler",
+ "io.apicurio.registry.serde.Legacy4ByteIdHandler",
+ "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider",
+ "io.apicurio.registry.serde.headers.DefaultHeadersHandler").methods().fields()
+ .build());
+
+ String defaultSchemaResolver = "io.apicurio.registry.serde.DefaultSchemaResolver";
+ if (QuarkusClassLoader.isClassPresentAtRuntime(defaultSchemaResolver)) {
+ // Class not present after 2.2.0.Final
+ reflectiveClass.produce(ReflectiveClassBuildItem.builder(defaultSchemaResolver).methods()
+ .fields().build());
+ }
+ }
+
+ @BuildStep
+ ExtensionSslNativeSupportBuildItem enableSslInNative() {
+ return new ExtensionSslNativeSupportBuildItem(Feature.APICURIO_REGISTRY_JSON_SCHEMA);
+ }
+
+}
diff --git a/extensions/schema-registry/apicurio/json-schema/pom.xml b/extensions/schema-registry/apicurio/json-schema/pom.xml
new file mode 100644
index 00000000000000..946f8f2d857f20
--- /dev/null
+++ b/extensions/schema-registry/apicurio/json-schema/pom.xml
@@ -0,0 +1,21 @@
+
+
+
+ quarkus-apicurio-registry-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ quarkus-apicurio-registry-json-schema-parent
+ Quarkus - Apicurio Registry - Json Schema
+ pom
+
+
+ deployment
+ runtime
+
+
\ No newline at end of file
diff --git a/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml b/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml
new file mode 100644
index 00000000000000..cdb6a28929e451
--- /dev/null
+++ b/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml
@@ -0,0 +1,61 @@
+
+
+ 4.0.0
+
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema-parent
+ 999-SNAPSHOT
+
+
+ quarkus-apicurio-registry-json-schema
+ Quarkus - Apicurio Registry - Json Schema - Runtime
+ Use Apicurio as Json schema registry
+
+
+
+ io.apicurio
+ apicurio-registry-serdes-jsonschema-serde
+
+
+ io.apicurio
+ apicurio-common-rest-client-jdk
+
+
+
+
+
+ io.quarkus
+ quarkus-apicurio-registry-common
+
+
+
+
+
+
+
+ io.quarkus
+ quarkus-extension-maven-plugin
+
+
+ io.quarkus.apicurio.registry.json
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
diff --git a/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml
new file mode 100644
index 00000000000000..7c0e0a875a9824
--- /dev/null
+++ b/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -0,0 +1,11 @@
+---
+artifact: ${project.groupId}:${project.artifactId}:${project.version}
+name: "Apicurio Registry - Json Schema"
+metadata:
+ keywords:
+ - "apicurio"
+ - "json-schema"
+ guide: ""
+ categories:
+ - "serialization"
+ status: "draft"
diff --git a/extensions/schema-registry/apicurio/pom.xml b/extensions/schema-registry/apicurio/pom.xml
index ce3c9d4de020f9..48249736defb50 100644
--- a/extensions/schema-registry/apicurio/pom.xml
+++ b/extensions/schema-registry/apicurio/pom.xml
@@ -17,5 +17,6 @@
common
avro
+ json-schema
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java
index dbe9ede291e332..3582c5bccfa372 100644
--- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java
@@ -36,7 +36,7 @@ class DefaultSerdeDiscoveryState {
private Boolean hasConfluent;
private Boolean hasApicurio1;
- private Boolean hasApicurio2;
+ private Boolean hasApicurio2Avro;
private Boolean hasJsonb;
DefaultSerdeDiscoveryState(IndexView index) {
@@ -155,18 +155,18 @@ boolean hasApicurio1() {
return hasApicurio1;
}
- boolean hasApicurio2() {
- if (hasApicurio2 == null) {
+ boolean hasApicurio2Avro() {
+ if (hasApicurio2Avro == null) {
try {
Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
- hasApicurio2 = true;
+ hasApicurio2Avro = true;
} catch (ClassNotFoundException e) {
- hasApicurio2 = false;
+ hasApicurio2Avro = false;
}
}
- return hasApicurio2;
+ return hasApicurio2Avro;
}
boolean hasJsonb() {
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
index a14824a45801f4..503b81f253dfb6 100644
--- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
@@ -880,7 +880,7 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T
int avroLibraries = 0;
avroLibraries += discovery.hasConfluent() ? 1 : 0;
avroLibraries += discovery.hasApicurio1() ? 1 : 0;
- avroLibraries += discovery.hasApicurio2() ? 1 : 0;
+ avroLibraries += discovery.hasApicurio2Avro() ? 1 : 0;
if (avroLibraries > 1) {
LOGGER.debugf("Skipping Avro serde autodetection for %s, because multiple Avro serde libraries are present",
typeName);
@@ -897,7 +897,7 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T
? Result.of("io.apicurio.registry.utils.serde.AvroKafkaSerializer")
: Result.of("io.apicurio.registry.utils.serde.AvroKafkaDeserializer")
.with(isAvroGenerated, "apicurio.registry.use-specific-avro-reader", "true");
- } else if (discovery.hasApicurio2()) {
+ } else if (discovery.hasApicurio2Avro()) {
return serializer
? Result.of("io.apicurio.registry.serde.avro.AvroKafkaSerializer")
: Result.of("io.apicurio.registry.serde.avro.AvroKafkaDeserializer")
@@ -908,6 +908,8 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T
}
}
+ //TODO autodiscovery of json serdes
+
// Jackson-based serializer/deserializer
// note that Jackson is always present with Kafka, so no need to check
{
diff --git a/integration-tests/kafka-json-schema-apicurio2/pom.xml b/integration-tests/kafka-json-schema-apicurio2/pom.xml
new file mode 100644
index 00000000000000..09b6d334469f6c
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/pom.xml
@@ -0,0 +1,222 @@
+
+
+
+ quarkus-integration-tests-parent
+ io.quarkus
+ 999-SNAPSHOT
+
+ 4.0.0
+
+ quarkus-integration-test-kafka-json-schema-apicurio2
+ Quarkus - Integration Tests - Kafka Json Schema with Apicurio 2.x
+ The Apache Kafka Json Schema with Apicurio Registry 2.x integration tests module
+
+
+
+ io.quarkus
+ quarkus-integration-test-class-transformer
+
+
+ io.quarkus
+ quarkus-integration-test-shared-library
+
+
+
+
+ io.quarkus
+ quarkus-resteasy
+
+
+ io.quarkus
+ quarkus-resteasy-jackson
+
+
+
+
+ io.quarkus
+ quarkus-kafka-client
+
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-csv
+
+
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+
+
+
+
+ io.strimzi
+ strimzi-test-container
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+
+
+ io.quarkus
+ quarkus-integration-test-class-transformer-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-kafka-client-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-jackson-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-apicurio-registry-json-schema-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+
+
+
+ confluent
+ https://packages.confluent.io/maven/
+
+ false
+
+
+
+
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+
+
+
+ generate-code
+ build
+
+
+
+
+
+
+ maven-failsafe-plugin
+
+ false
+
+
+
+
+ maven-surefire-plugin
+
+ false
+
+
+
+
+
+
+
+ test-kafka
+
+
+ test-containers
+
+
+
+
+
+ maven-surefire-plugin
+
+ false
+
+
+
+ maven-failsafe-plugin
+
+ false
+
+
+
+
+
+
+
+
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java
new file mode 100644
index 00000000000000..f65ff696a15a26
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java
@@ -0,0 +1,58 @@
+package io.quarkus.it.kafka.jsonschema;
+
+import java.time.Duration;
+
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.POST;
+import jakarta.ws.rs.Path;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import io.vertx.core.json.JsonObject;
+
+/**
+ * Endpoint to test the Json Schema support
+ */
+@Path("/json-schema")
+public class JsonSchemaEndpoint {
+
+ @Inject
+ JsonSchemaKafkaCreator creator;
+
+ @GET
+ @Path("/apicurio")
+ public JsonObject getApicurio() {
+ return get(creator.createApicurioConsumer("test-json-schema-apicurio-consumer", "test-json-schema-apicurio-consumer"));
+ }
+
+ @POST
+ @Path("/apicurio")
+ public void sendApicurio(Pet pet) {
+ KafkaProducer p = creator.createApicurioProducer("test-json-schema-apicurio");
+ send(p, pet, "test-json-schema-apicurio-producer");
+ }
+
+ private JsonObject get(KafkaConsumer consumer) {
+ final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000));
+ if (records.isEmpty()) {
+ return null;
+ }
+ ConsumerRecord consumerRecord = records.iterator().next();
+ Pet p = consumerRecord.value();
+ // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema.
+ JsonObject result = new JsonObject();
+ result.put("name", p.getName());
+ result.put("color", p.getColor());
+ return result;
+ }
+
+ private void send(KafkaProducer producer, Pet pet, String topic) {
+ producer.send(new ProducerRecord<>(topic, 0, pet));
+ producer.flush();
+ }
+}
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java
new file mode 100644
index 00000000000000..989d2f0e106672
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java
@@ -0,0 +1,117 @@
+package io.quarkus.it.kafka.jsonschema;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+import io.apicurio.registry.serde.SerdeConfig;
+import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
+import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
+
+/**
+ * Create Json Schema Kafka Consumers and Producers
+ */
+@ApplicationScoped
+public class JsonSchemaKafkaCreator {
+
+ @ConfigProperty(name = "kafka.bootstrap.servers")
+ String bootstrap;
+
+ @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url")
+ String apicurioRegistryUrl;
+
+ public JsonSchemaKafkaCreator() {
+ }
+
+ public JsonSchemaKafkaCreator(String bootstrap, String apicurioRegistryUrl) {
+ this.bootstrap = bootstrap;
+ this.apicurioRegistryUrl = apicurioRegistryUrl;
+ }
+
+ public String getApicurioRegistryUrl() {
+ return apicurioRegistryUrl;
+ }
+
+ public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) {
+ return createApicurioConsumer(bootstrap, getApicurioRegistryUrl(), groupdIdConfig, subscribtionName);
+ }
+
+ public KafkaProducer createApicurioProducer(String clientId) {
+ return createApicurioProducer(bootstrap, getApicurioRegistryUrl(), clientId);
+ }
+
+ public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio,
+ String groupdIdConfig, String subscribtionName) {
+ Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig);
+ return createConsumer(p, subscribtionName);
+ }
+
+ public static KafkaProducer createApicurioProducer(String bootstrap, String apicurio,
+ String clientId) {
+ Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId);
+ return createProducer(p);
+ }
+
+ private static KafkaConsumer createConsumer(Properties props, String subscribtionName) {
+ if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) {
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ }
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList(subscribtionName));
+ return consumer;
+ }
+
+ private static KafkaProducer createProducer(Properties props) {
+ if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) {
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+ }
+ return new KafkaProducer<>(props);
+ }
+
+ public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) {
+ Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class.getName());
+ props.put(SerdeConfig.REGISTRY_URL, apicurio);
+ return props;
+ }
+
+ private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ return props;
+ }
+
+ private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) {
+ Properties props = getGenericProducerProperties(bootstrap, clientId);
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName());
+ props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
+ props.put(SerdeConfig.SCHEMA_LOCATION, "json-schema.json");
+ props.put(SerdeConfig.VALIDATION_ENABLED, "true");
+ props.put(SerdeConfig.REGISTRY_URL, apicurio);
+ return props;
+ }
+
+ private static Properties getGenericProducerProperties(String bootstrap, String clientId) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+ return props;
+ }
+}
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java
new file mode 100644
index 00000000000000..ee47fb2fe9482c
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java
@@ -0,0 +1,31 @@
+package io.quarkus.it.kafka.jsonschema;
+
+public class Pet {
+
+ private String name;
+ private String color;
+
+ public Pet() {
+ }
+
+ public Pet(String name, String color) {
+ this.name = name;
+ this.color = color;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getColor() {
+ return color;
+ }
+
+ public void setColor(String color) {
+ this.color = color;
+ }
+}
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties
new file mode 100644
index 00000000000000..69d4364f6b1c5e
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties
@@ -0,0 +1,10 @@
+quarkus.log.category.kafka.level=WARN
+quarkus.log.category.\"org.apache.kafka\".level=WARN
+quarkus.log.category.\"org.apache.zookeeper\".level=WARN
+
+quarkus.native.resources.includes=json-schema.json
+
+# enable health check
+quarkus.kafka.health.enabled=true
+
+quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.4.2.Final
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json
new file mode 100644
index 00000000000000..18a1c5f482bd62
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json
@@ -0,0 +1,16 @@
+{
+ "$id": "https://example.com/person.schema.json",
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Pet",
+ "type": "object",
+ "properties": {
+ "name": {
+ "type": "string",
+ "description": "The pet's name."
+ },
+ "color": {
+ "type": "string",
+ "description": "The pet's color."
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java
new file mode 100644
index 00000000000000..31ddb232969385
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java
@@ -0,0 +1,29 @@
+package io.quarkus.it.kafka;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import io.apicurio.registry.rest.client.RegistryClientFactory;
+import io.apicurio.rest.client.VertxHttpClientProvider;
+import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.vertx.core.Vertx;
+
+@QuarkusIntegrationTest
+@QuarkusTestResource(value = KafkaResource.class, restrictToAnnotatedClass = true)
+public class KafkaJsonSchemaIT extends KafkaJsonSchemaTestBase {
+
+ JsonSchemaKafkaCreator creator;
+
+ @Override
+ JsonSchemaKafkaCreator creator() {
+ return creator;
+ }
+
+ @BeforeAll
+ public static void setUp() {
+ // this is for the test JVM, which also uses Kafka client, which in turn also interacts with the registry
+ RegistryClientFactory.setProvider(new VertxHttpClientProvider(Vertx.vertx()));
+ }
+
+}
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java
new file mode 100644
index 00000000000000..606ded95dadfb1
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java
@@ -0,0 +1,18 @@
+package io.quarkus.it.kafka;
+
+import jakarta.inject.Inject;
+
+import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+public class KafkaJsonSchemaTest extends KafkaJsonSchemaTestBase {
+
+ @Inject
+ JsonSchemaKafkaCreator creator;
+
+ @Override
+ JsonSchemaKafkaCreator creator() {
+ return creator;
+ }
+}
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java
new file mode 100644
index 00000000000000..796540becc0a7c
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java
@@ -0,0 +1,71 @@
+package io.quarkus.it.kafka;
+
+import java.time.Duration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
+import io.quarkus.it.kafka.jsonschema.Pet;
+import io.restassured.RestAssured;
+
+public abstract class KafkaJsonSchemaTestBase {
+
+ static final String APICURIO_PATH = "/json-schema/apicurio";
+
+ abstract JsonSchemaKafkaCreator creator();
+
+ @Test
+ public void testUrls() {
+ Assertions.assertTrue(creator().getApicurioRegistryUrl().endsWith("/apis/registry/v2"));
+ }
+
+ @Test
+ public void testApicurioJsonSchemaProducer() {
+ String subscriptionName = "test-json-schema-apicurio-producer";
+
+ KafkaConsumer consumer = creator().createApicurioConsumer(
+ "test-json-schema-apicurio",
+ subscriptionName);
+ testJsonSchemaProducer(consumer, APICURIO_PATH);
+ }
+
+ @Test
+ public void testApicurioJsonSchemaConsumer() {
+ String topic = "test-json-schema-apicurio-consumer";
+ KafkaProducer producer = creator().createApicurioProducer("test-json-schema-apicurio-test");
+ testJsonSchemaConsumer(producer, APICURIO_PATH, topic);
+ }
+
+ protected void testJsonSchemaProducer(KafkaConsumer consumer, String path) {
+ RestAssured.given()
+ .header("content-type", "application/json")
+ .body("{\"name\":\"neo\", \"color\":\"tricolor\"}")
+ .post(path);
+ ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next();
+ Assertions.assertEquals(records.key(), (Integer) 0);
+ Pet pet = records.value();
+ Assertions.assertEquals("neo", pet.getName());
+ Assertions.assertEquals("tricolor", pet.getColor());
+ consumer.close();
+ }
+
+ protected void testJsonSchemaConsumer(KafkaProducer producer, String path, String topic) {
+ producer.send(new ProducerRecord<>(topic, 1, createPet()));
+ Pet retrieved = RestAssured.when().get(path).as(Pet.class);
+ Assertions.assertEquals("neo", retrieved.getName());
+ Assertions.assertEquals("white", retrieved.getColor());
+ producer.close();
+ }
+
+ private Pet createPet() {
+ Pet pet = new Pet();
+ pet.setName("neo");
+ pet.setColor("white");
+ return pet;
+ }
+}
diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java
new file mode 100644
index 00000000000000..dabe27a7715ed7
--- /dev/null
+++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java
@@ -0,0 +1,39 @@
+package io.quarkus.it.kafka;
+
+import java.util.Collections;
+import java.util.Map;
+
+import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator;
+import io.quarkus.test.common.DevServicesContext;
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+
+public class KafkaResource implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware {
+
+ JsonSchemaKafkaCreator creator;
+
+ @Override
+ public void setIntegrationTestContext(DevServicesContext context) {
+ Map devServicesProperties = context.devServicesProperties();
+ String bootstrapServers = devServicesProperties.get("kafka.bootstrap.servers");
+ if (bootstrapServers != null) {
+ String apicurioUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.apicurio.registry.url");
+ creator = new JsonSchemaKafkaCreator(bootstrapServers, apicurioUrl);
+ }
+ }
+
+ @Override
+ public Map start() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public void inject(TestInjector testInjector) {
+ testInjector.injectIntoFields(
+ creator,
+ new TestInjector.MatchesType(JsonSchemaKafkaCreator.class));
+ }
+}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 3e9da6012ddb93..25f1ca19851232 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -8,6 +8,7 @@
999-SNAPSHOT
../build-parent/pom.xml
+
4.0.0
quarkus-integration-tests-parent