From 61c1919f1610f018d65f18b882e0d09e64167ae8 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Thu, 23 Nov 2023 15:22:59 +0100 Subject: [PATCH] Add apicurio json schema serialization support --- .github/native-tests.json | 2 +- bom/application/pom.xml | 17 +- .../io/quarkus/deployment/Capability.java | 1 + .../java/io/quarkus/deployment/Feature.java | 1 + devtools/bom-descriptor-json/pom.xml | 13 + docs/pom.xml | 13 + .../client/deployment/KafkaProcessor.java | 9 +- .../apicurio/json-schema/deployment/pom.xml | 45 ++++ .../ApicurioRegistryJsonSchemaProcessor.java | 51 ++++ .../apicurio/json-schema/pom.xml | 21 ++ .../apicurio/json-schema/runtime/pom.xml | 61 +++++ .../resources/META-INF/quarkus-extension.yaml | 11 + extensions/schema-registry/apicurio/pom.xml | 1 + .../DefaultSerdeDiscoveryState.java | 12 +- ...allRyeReactiveMessagingKafkaProcessor.java | 6 +- .../kafka-json-schema-apicurio2/pom.xml | 222 ++++++++++++++++++ .../kafka/jsonschema/JsonSchemaEndpoint.java | 58 +++++ .../jsonschema/JsonSchemaKafkaCreator.java | 117 +++++++++ .../io/quarkus/it/kafka/jsonschema/Pet.java | 31 +++ .../src/main/resources/application.properties | 10 + .../src/main/resources/json-schema.json | 16 ++ .../quarkus/it/kafka/KafkaJsonSchemaIT.java | 29 +++ .../quarkus/it/kafka/KafkaJsonSchemaTest.java | 18 ++ .../it/kafka/KafkaJsonSchemaTestBase.java | 71 ++++++ .../io/quarkus/it/kafka/KafkaResource.java | 39 +++ integration-tests/pom.xml | 1 + 26 files changed, 865 insertions(+), 11 deletions(-) create mode 100644 extensions/schema-registry/apicurio/json-schema/deployment/pom.xml create mode 100644 extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java create mode 100644 extensions/schema-registry/apicurio/json-schema/pom.xml create mode 100644 extensions/schema-registry/apicurio/json-schema/runtime/pom.xml create mode 100644 extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml create mode 100644 integration-tests/kafka-json-schema-apicurio2/pom.xml create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java create mode 100644 integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java diff --git a/.github/native-tests.json b/.github/native-tests.json index 4308dea0d13c3a..577be470bb8eb4 100644 --- a/.github/native-tests.json +++ b/.github/native-tests.json @@ -57,7 +57,7 @@ { "category": "Messaging1", "timeout": 115, - "test-modules": "kafka, kafka-ssl, kafka-sasl, kafka-avro-apicurio2, kafka-snappy, kafka-streams, reactive-messaging-kafka, kafka-oauth-keycloak", + "test-modules": "kafka, kafka-ssl, kafka-sasl, kafka-avro-apicurio2, kafka-json-schema-apicurio2, kafka-snappy, kafka-streams, reactive-messaging-kafka, kafka-oauth-keycloak", "os-name": "ubuntu-latest" }, { diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 92385c4afe3362..c7213e4efc7969 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -207,7 +207,7 @@ 2.22.0 1.3.0.Final 1.11.3 - 2.5.3.Final + 2.5.7.Final 0.1.18.Final 1.19.3 3.3.4 @@ -1379,6 +1379,16 @@ quarkus-apicurio-registry-avro-deployment ${project.version} + + io.quarkus + quarkus-apicurio-registry-json-schema + ${project.version} + + + io.quarkus + quarkus-apicurio-registry-json-schema-deployment + ${project.version} + io.quarkus quarkus-confluent-registry-common @@ -3379,6 +3389,11 @@ apicurio-registry-serdes-avro-serde ${apicurio-registry.version} + + io.apicurio + apicurio-registry-serdes-jsonschema-serde + ${apicurio-registry.version} + io.apicurio apicurio-common-rest-client-vertx diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java index 6afbe631768b1b..6cd1937771799b 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Capability.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Capability.java @@ -130,6 +130,7 @@ public interface Capability { String APICURIO_REGISTRY = QUARKUS_PREFIX + ".apicurio.registry"; String APICURIO_REGISTRY_AVRO = APICURIO_REGISTRY + ".avro"; + String APICURIO_REGISTRY_JSON_SCHEMA = APICURIO_REGISTRY + ".json"; String CONFLUENT_REGISTRY = QUARKUS_PREFIX + ".confluent.registry"; String CONFLUENT_REGISTRY_AVRO = CONFLUENT_REGISTRY + ".avro"; diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java index 1f52df61cac15a..3c7fa9726a6641 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java @@ -13,6 +13,7 @@ public enum Feature { AMAZON_LAMBDA, AZURE_FUNCTIONS, APICURIO_REGISTRY_AVRO, + APICURIO_REGISTRY_JSON_SCHEMA, AWT, CACHE, CDI, diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index 79f1e15561eedb..66b67abc53c138 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -200,6 +200,19 @@ + + io.quarkus + quarkus-apicurio-registry-json-schema + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc diff --git a/docs/pom.xml b/docs/pom.xml index 274c6effd891e3..07bf78b6b8df0c 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -216,6 +216,19 @@ + + io.quarkus + quarkus-apicurio-registry-json-schema-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-arc-deployment diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 37ea59286d6024..3a574b4424f850 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -356,12 +356,19 @@ private void handleAvro(BuildProducer reflectiveClass, "java.lang.AutoCloseable")); } - // --- Apicurio Registry 2.x --- + // --- Apicurio Registry 2.x Avro --- if (QuarkusClassLoader.isClassPresentAtRuntime("io.apicurio.registry.serde.avro.AvroKafkaDeserializer") && !capabilities.isPresent(Capability.APICURIO_REGISTRY_AVRO)) { throw new RuntimeException( "Apicurio Registry 2.x Avro classes detected, please use the quarkus-apicurio-registry-avro extension"); } + + // --- Apicurio Registry 2.x Json Schema --- + if (QuarkusClassLoader.isClassPresentAtRuntime("io.apicurio.registry.serde.avro.JsonKafkaDeserializer") + && !capabilities.isPresent(Capability.APICURIO_REGISTRY_JSON_SCHEMA)) { + throw new RuntimeException( + "Apicurio Registry 2.x Json classes detected, please use the quarkus-apicurio-registry-json extension"); + } } @BuildStep diff --git a/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml b/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml new file mode 100644 index 00000000000000..0fb9a83544704a --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/deployment/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + + io.quarkus + quarkus-apicurio-registry-json-schema-parent + 999-SNAPSHOT + + + quarkus-apicurio-registry-json-schema-deployment + Quarkus - Apicurio Registry - Json Schema - Deployment + + + + io.quarkus + quarkus-apicurio-registry-json-schema + + + + io.quarkus + quarkus-apicurio-registry-common-deployment + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java b/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java new file mode 100644 index 00000000000000..83c6f0886bba91 --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/deployment/src/main/java/io/quarkus/apicurio/registry/jsonschema/ApicurioRegistryJsonSchemaProcessor.java @@ -0,0 +1,51 @@ +package io.quarkus.apicurio.registry.jsonschema; + +import io.quarkus.bootstrap.classloading.QuarkusClassLoader; +import io.quarkus.deployment.Feature; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; + +public class ApicurioRegistryJsonSchemaProcessor { + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(Feature.APICURIO_REGISTRY_JSON_SCHEMA); + } + + @BuildStep + public void apicurioRegistryJsonSchema(BuildProducer reflectiveClass, + BuildProducer sslNativeSupport) { + + reflectiveClass + .produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer", + "io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer").methods().build()); + + reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy", + "io.apicurio.registry.serde.strategy.TopicIdStrategy", + "io.apicurio.registry.serde.strategy.QualifiedRecordIdStrategy", + "io.apicurio.registry.serde.strategy.RecordIdStrategy", + "io.apicurio.registry.serde.jsonschema.strategy.TopicRecordIdStrategy").methods().fields() + .build()); + + reflectiveClass.produce(ReflectiveClassBuildItem.builder("io.apicurio.registry.serde.DefaultIdHandler", + "io.apicurio.registry.serde.Legacy4ByteIdHandler", + "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider", + "io.apicurio.registry.serde.headers.DefaultHeadersHandler").methods().fields() + .build()); + + String defaultSchemaResolver = "io.apicurio.registry.serde.DefaultSchemaResolver"; + if (QuarkusClassLoader.isClassPresentAtRuntime(defaultSchemaResolver)) { + // Class not present after 2.2.0.Final + reflectiveClass.produce(ReflectiveClassBuildItem.builder(defaultSchemaResolver).methods() + .fields().build()); + } + } + + @BuildStep + ExtensionSslNativeSupportBuildItem enableSslInNative() { + return new ExtensionSslNativeSupportBuildItem(Feature.APICURIO_REGISTRY_JSON_SCHEMA); + } + +} diff --git a/extensions/schema-registry/apicurio/json-schema/pom.xml b/extensions/schema-registry/apicurio/json-schema/pom.xml new file mode 100644 index 00000000000000..946f8f2d857f20 --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/pom.xml @@ -0,0 +1,21 @@ + + + + quarkus-apicurio-registry-parent + io.quarkus + 999-SNAPSHOT + ../pom.xml + + + 4.0.0 + quarkus-apicurio-registry-json-schema-parent + Quarkus - Apicurio Registry - Json Schema + pom + + + deployment + runtime + + \ No newline at end of file diff --git a/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml b/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml new file mode 100644 index 00000000000000..cdb6a28929e451 --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/runtime/pom.xml @@ -0,0 +1,61 @@ + + + 4.0.0 + + + io.quarkus + quarkus-apicurio-registry-json-schema-parent + 999-SNAPSHOT + + + quarkus-apicurio-registry-json-schema + Quarkus - Apicurio Registry - Json Schema - Runtime + Use Apicurio as Json schema registry + + + + io.apicurio + apicurio-registry-serdes-jsonschema-serde + + + io.apicurio + apicurio-common-rest-client-jdk + + + + + + io.quarkus + quarkus-apicurio-registry-common + + + + + + + + io.quarkus + quarkus-extension-maven-plugin + + + io.quarkus.apicurio.registry.json + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 00000000000000..7c0e0a875a9824 --- /dev/null +++ b/extensions/schema-registry/apicurio/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,11 @@ +--- +artifact: ${project.groupId}:${project.artifactId}:${project.version} +name: "Apicurio Registry - Json Schema" +metadata: + keywords: + - "apicurio" + - "json-schema" + guide: "" + categories: + - "serialization" + status: "draft" diff --git a/extensions/schema-registry/apicurio/pom.xml b/extensions/schema-registry/apicurio/pom.xml index ce3c9d4de020f9..48249736defb50 100644 --- a/extensions/schema-registry/apicurio/pom.xml +++ b/extensions/schema-registry/apicurio/pom.xml @@ -17,5 +17,6 @@ common avro + json-schema diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java index dbe9ede291e332..3582c5bccfa372 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeDiscoveryState.java @@ -36,7 +36,7 @@ class DefaultSerdeDiscoveryState { private Boolean hasConfluent; private Boolean hasApicurio1; - private Boolean hasApicurio2; + private Boolean hasApicurio2Avro; private Boolean hasJsonb; DefaultSerdeDiscoveryState(IndexView index) { @@ -155,18 +155,18 @@ boolean hasApicurio1() { return hasApicurio1; } - boolean hasApicurio2() { - if (hasApicurio2 == null) { + boolean hasApicurio2Avro() { + if (hasApicurio2Avro == null) { try { Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false, Thread.currentThread().getContextClassLoader()); - hasApicurio2 = true; + hasApicurio2Avro = true; } catch (ClassNotFoundException e) { - hasApicurio2 = false; + hasApicurio2Avro = false; } } - return hasApicurio2; + return hasApicurio2Avro; } boolean hasJsonb() { diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index a14824a45801f4..503b81f253dfb6 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -880,7 +880,7 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T int avroLibraries = 0; avroLibraries += discovery.hasConfluent() ? 1 : 0; avroLibraries += discovery.hasApicurio1() ? 1 : 0; - avroLibraries += discovery.hasApicurio2() ? 1 : 0; + avroLibraries += discovery.hasApicurio2Avro() ? 1 : 0; if (avroLibraries > 1) { LOGGER.debugf("Skipping Avro serde autodetection for %s, because multiple Avro serde libraries are present", typeName); @@ -897,7 +897,7 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T ? Result.of("io.apicurio.registry.utils.serde.AvroKafkaSerializer") : Result.of("io.apicurio.registry.utils.serde.AvroKafkaDeserializer") .with(isAvroGenerated, "apicurio.registry.use-specific-avro-reader", "true"); - } else if (discovery.hasApicurio2()) { + } else if (discovery.hasApicurio2Avro()) { return serializer ? Result.of("io.apicurio.registry.serde.avro.AvroKafkaSerializer") : Result.of("io.apicurio.registry.serde.avro.AvroKafkaDeserializer") @@ -908,6 +908,8 @@ private Result serializerDeserializerFor(DefaultSerdeDiscoveryState discovery, T } } + //TODO autodiscovery of json serdes + // Jackson-based serializer/deserializer // note that Jackson is always present with Kafka, so no need to check { diff --git a/integration-tests/kafka-json-schema-apicurio2/pom.xml b/integration-tests/kafka-json-schema-apicurio2/pom.xml new file mode 100644 index 00000000000000..09b6d334469f6c --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/pom.xml @@ -0,0 +1,222 @@ + + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + 4.0.0 + + quarkus-integration-test-kafka-json-schema-apicurio2 + Quarkus - Integration Tests - Kafka Json Schema with Apicurio 2.x + The Apache Kafka Json Schema with Apicurio Registry 2.x integration tests module + + + + io.quarkus + quarkus-integration-test-class-transformer + + + io.quarkus + quarkus-integration-test-shared-library + + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jackson + + + + + io.quarkus + quarkus-kafka-client + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + + + + io.quarkus + quarkus-apicurio-registry-json-schema + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + jakarta.xml.bind + jakarta.xml.bind-api + + + + + io.strimzi + strimzi-test-container + test + + + org.apache.logging.log4j + log4j-core + + + + + org.testcontainers + testcontainers + test + + + + + io.quarkus + quarkus-integration-test-class-transformer-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-kafka-client-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-apicurio-registry-json-schema-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + confluent + https://packages.confluent.io/maven/ + + false + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + maven-failsafe-plugin + + false + + + + + maven-surefire-plugin + + false + + + + + + + + test-kafka + + + test-containers + + + + + + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + false + + + + + + + + diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java new file mode 100644 index 00000000000000..f65ff696a15a26 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java @@ -0,0 +1,58 @@ +package io.quarkus.it.kafka.jsonschema; + +import java.time.Duration; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.vertx.core.json.JsonObject; + +/** + * Endpoint to test the Json Schema support + */ +@Path("/json-schema") +public class JsonSchemaEndpoint { + + @Inject + JsonSchemaKafkaCreator creator; + + @GET + @Path("/apicurio") + public JsonObject getApicurio() { + return get(creator.createApicurioConsumer("test-json-schema-apicurio-consumer", "test-json-schema-apicurio-consumer")); + } + + @POST + @Path("/apicurio") + public void sendApicurio(Pet pet) { + KafkaProducer p = creator.createApicurioProducer("test-json-schema-apicurio"); + send(p, pet, "test-json-schema-apicurio-producer"); + } + + private JsonObject get(KafkaConsumer consumer) { + final ConsumerRecords records = consumer.poll(Duration.ofMillis(60000)); + if (records.isEmpty()) { + return null; + } + ConsumerRecord consumerRecord = records.iterator().next(); + Pet p = consumerRecord.value(); + // We cannot serialize the returned Pet directly, it contains non-serializable object such as the schema. + JsonObject result = new JsonObject(); + result.put("name", p.getName()); + result.put("color", p.getColor()); + return result; + } + + private void send(KafkaProducer producer, Pet pet, String topic) { + producer.send(new ProducerRecord<>(topic, 0, pet)); + producer.flush(); + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java new file mode 100644 index 00000000000000..989d2f0e106672 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java @@ -0,0 +1,117 @@ +package io.quarkus.it.kafka.jsonschema; + +import java.util.Collections; +import java.util.Properties; +import java.util.UUID; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer; +import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer; + +/** + * Create Json Schema Kafka Consumers and Producers + */ +@ApplicationScoped +public class JsonSchemaKafkaCreator { + + @ConfigProperty(name = "kafka.bootstrap.servers") + String bootstrap; + + @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url") + String apicurioRegistryUrl; + + public JsonSchemaKafkaCreator() { + } + + public JsonSchemaKafkaCreator(String bootstrap, String apicurioRegistryUrl) { + this.bootstrap = bootstrap; + this.apicurioRegistryUrl = apicurioRegistryUrl; + } + + public String getApicurioRegistryUrl() { + return apicurioRegistryUrl; + } + + public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) { + return createApicurioConsumer(bootstrap, getApicurioRegistryUrl(), groupdIdConfig, subscribtionName); + } + + public KafkaProducer createApicurioProducer(String clientId) { + return createApicurioProducer(bootstrap, getApicurioRegistryUrl(), clientId); + } + + public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio, + String groupdIdConfig, String subscribtionName) { + Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig); + return createConsumer(p, subscribtionName); + } + + public static KafkaProducer createApicurioProducer(String bootstrap, String apicurio, + String clientId) { + Properties p = getApicurioProducerProperties(bootstrap, apicurio, clientId); + return createProducer(p); + } + + private static KafkaConsumer createConsumer(Properties props, String subscribtionName) { + if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) { + props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(subscribtionName)); + return consumer; + } + + private static KafkaProducer createProducer(Properties props) { + if (!props.containsKey(ProducerConfig.CLIENT_ID_CONFIG)) { + props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + } + return new KafkaProducer<>(props); + } + + public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) { + Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class.getName()); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + return props; + } + + private static Properties getGenericConsumerProperties(String bootstrap, String groupdIdConfig) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupdIdConfig); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + return props; + } + + private static Properties getApicurioProducerProperties(String bootstrap, String apicurio, String clientId) { + Properties props = getGenericProducerProperties(bootstrap, clientId); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName()); + props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true); + props.put(SerdeConfig.SCHEMA_LOCATION, "json-schema.json"); + props.put(SerdeConfig.VALIDATION_ENABLED, "true"); + props.put(SerdeConfig.REGISTRY_URL, apicurio); + return props; + } + + private static Properties getGenericProducerProperties(String bootstrap, String clientId) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + return props; + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java new file mode 100644 index 00000000000000..ee47fb2fe9482c --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java @@ -0,0 +1,31 @@ +package io.quarkus.it.kafka.jsonschema; + +public class Pet { + + private String name; + private String color; + + public Pet() { + } + + public Pet(String name, String color) { + this.name = name; + this.color = color; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getColor() { + return color; + } + + public void setColor(String color) { + this.color = color; + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties new file mode 100644 index 00000000000000..69d4364f6b1c5e --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/application.properties @@ -0,0 +1,10 @@ +quarkus.log.category.kafka.level=WARN +quarkus.log.category.\"org.apache.kafka\".level=WARN +quarkus.log.category.\"org.apache.zookeeper\".level=WARN + +quarkus.native.resources.includes=json-schema.json + +# enable health check +quarkus.kafka.health.enabled=true + +quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.4.2.Final diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json new file mode 100644 index 00000000000000..18a1c5f482bd62 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/resources/json-schema.json @@ -0,0 +1,16 @@ +{ + "$id": "https://example.com/person.schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Pet", + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "The pet's name." + }, + "color": { + "type": "string", + "description": "The pet's color." + } + } +} \ No newline at end of file diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java new file mode 100644 index 00000000000000..31ddb232969385 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaIT.java @@ -0,0 +1,29 @@ +package io.quarkus.it.kafka; + +import org.junit.jupiter.api.BeforeAll; + +import io.apicurio.registry.rest.client.RegistryClientFactory; +import io.apicurio.rest.client.VertxHttpClientProvider; +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.vertx.core.Vertx; + +@QuarkusIntegrationTest +@QuarkusTestResource(value = KafkaResource.class, restrictToAnnotatedClass = true) +public class KafkaJsonSchemaIT extends KafkaJsonSchemaTestBase { + + JsonSchemaKafkaCreator creator; + + @Override + JsonSchemaKafkaCreator creator() { + return creator; + } + + @BeforeAll + public static void setUp() { + // this is for the test JVM, which also uses Kafka client, which in turn also interacts with the registry + RegistryClientFactory.setProvider(new VertxHttpClientProvider(Vertx.vertx())); + } + +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java new file mode 100644 index 00000000000000..606ded95dadfb1 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTest.java @@ -0,0 +1,18 @@ +package io.quarkus.it.kafka; + +import jakarta.inject.Inject; + +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class KafkaJsonSchemaTest extends KafkaJsonSchemaTestBase { + + @Inject + JsonSchemaKafkaCreator creator; + + @Override + JsonSchemaKafkaCreator creator() { + return creator; + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java new file mode 100644 index 00000000000000..796540becc0a7c --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java @@ -0,0 +1,71 @@ +package io.quarkus.it.kafka; + +import java.time.Duration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.it.kafka.jsonschema.Pet; +import io.restassured.RestAssured; + +public abstract class KafkaJsonSchemaTestBase { + + static final String APICURIO_PATH = "/json-schema/apicurio"; + + abstract JsonSchemaKafkaCreator creator(); + + @Test + public void testUrls() { + Assertions.assertTrue(creator().getApicurioRegistryUrl().endsWith("/apis/registry/v2")); + } + + @Test + public void testApicurioJsonSchemaProducer() { + String subscriptionName = "test-json-schema-apicurio-producer"; + + KafkaConsumer consumer = creator().createApicurioConsumer( + "test-json-schema-apicurio", + subscriptionName); + testJsonSchemaProducer(consumer, APICURIO_PATH); + } + + @Test + public void testApicurioJsonSchemaConsumer() { + String topic = "test-json-schema-apicurio-consumer"; + KafkaProducer producer = creator().createApicurioProducer("test-json-schema-apicurio-test"); + testJsonSchemaConsumer(producer, APICURIO_PATH, topic); + } + + protected void testJsonSchemaProducer(KafkaConsumer consumer, String path) { + RestAssured.given() + .header("content-type", "application/json") + .body("{\"name\":\"neo\", \"color\":\"tricolor\"}") + .post(path); + ConsumerRecord records = consumer.poll(Duration.ofMillis(20000)).iterator().next(); + Assertions.assertEquals(records.key(), (Integer) 0); + Pet pet = records.value(); + Assertions.assertEquals("neo", pet.getName()); + Assertions.assertEquals("tricolor", pet.getColor()); + consumer.close(); + } + + protected void testJsonSchemaConsumer(KafkaProducer producer, String path, String topic) { + producer.send(new ProducerRecord<>(topic, 1, createPet())); + Pet retrieved = RestAssured.when().get(path).as(Pet.class); + Assertions.assertEquals("neo", retrieved.getName()); + Assertions.assertEquals("white", retrieved.getColor()); + producer.close(); + } + + private Pet createPet() { + Pet pet = new Pet(); + pet.setName("neo"); + pet.setColor("white"); + return pet; + } +} diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java new file mode 100644 index 00000000000000..dabe27a7715ed7 --- /dev/null +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java @@ -0,0 +1,39 @@ +package io.quarkus.it.kafka; + +import java.util.Collections; +import java.util.Map; + +import io.quarkus.it.kafka.jsonschema.JsonSchemaKafkaCreator; +import io.quarkus.test.common.DevServicesContext; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; + +public class KafkaResource implements QuarkusTestResourceLifecycleManager, DevServicesContext.ContextAware { + + JsonSchemaKafkaCreator creator; + + @Override + public void setIntegrationTestContext(DevServicesContext context) { + Map devServicesProperties = context.devServicesProperties(); + String bootstrapServers = devServicesProperties.get("kafka.bootstrap.servers"); + if (bootstrapServers != null) { + String apicurioUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.apicurio.registry.url"); + creator = new JsonSchemaKafkaCreator(bootstrapServers, apicurioUrl); + } + } + + @Override + public Map start() { + return Collections.emptyMap(); + } + + @Override + public void stop() { + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields( + creator, + new TestInjector.MatchesType(JsonSchemaKafkaCreator.class)); + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 3e9da6012ddb93..25f1ca19851232 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -8,6 +8,7 @@ 999-SNAPSHOT ../build-parent/pom.xml + 4.0.0 quarkus-integration-tests-parent