From 2700391399f683a9c6d2b33f59ef6740a77bb632 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Wed, 20 Dec 2023 18:01:09 +0100 Subject: [PATCH] Add confluent json serde support --- bom/application/pom.xml | 11 ++- .../java/io/quarkus/deployment/Feature.java | 1 + devtools/bom-descriptor-json/pom.xml | 13 +++ docs/pom.xml | 13 +++ .../confluent/json-schema/deployment/pom.xml | 49 ++++++++++ .../json/ConfluentRegistryJsonProcessor.java | 76 ++++++++++++++++ .../confluent/json-schema/pom.xml | 21 +++++ .../confluent/json-schema/runtime/pom.xml | 90 +++++++++++++++++++ .../json/ConfluentJsonSubstitutions.java | 72 +++++++++++++++ .../resources/META-INF/quarkus-extension.yaml | 10 +++ extensions/schema-registry/confluent/pom.xml | 21 +++++ .../kafka-json-schema-apicurio2/pom.xml | 40 ++++++++- .../kafka/jsonschema/JsonSchemaEndpoint.java | 14 +++ .../jsonschema/JsonSchemaKafkaCreator.java | 50 ++++++++++- .../io/quarkus/it/kafka/jsonschema/Pet.java | 20 +++++ .../it/kafka/KafkaJsonSchemaTestBase.java | 16 ++++ .../io/quarkus/it/kafka/KafkaResource.java | 3 +- 17 files changed, 516 insertions(+), 4 deletions(-) create mode 100644 extensions/schema-registry/confluent/json-schema/deployment/pom.xml create mode 100644 extensions/schema-registry/confluent/json-schema/deployment/src/main/java/io/quarkus/confluent/registry/json/ConfluentRegistryJsonProcessor.java create mode 100644 extensions/schema-registry/confluent/json-schema/pom.xml create mode 100644 extensions/schema-registry/confluent/json-schema/runtime/pom.xml create mode 100644 extensions/schema-registry/confluent/json-schema/runtime/src/main/java/io/quarkus/confluent/registry/json/ConfluentJsonSubstitutions.java create mode 100644 extensions/schema-registry/confluent/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 5f261e167e978..f388fb3676652 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -1404,6 +1404,16 @@ quarkus-confluent-registry-avro ${project.version} + + io.quarkus + quarkus-confluent-registry-json-schema + ${project.version} + + + io.quarkus + quarkus-confluent-registry-json-schema-deployment + ${project.version} + io.quarkus quarkus-confluent-registry-avro-deployment @@ -3399,7 +3409,6 @@ apicurio-common-rest-client-vertx ${apicurio-common-rest-client.version} - io.quarkus quarkus-mutiny diff --git a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java index 3c7fa9726a664..69c785ab424aa 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/Feature.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/Feature.java @@ -19,6 +19,7 @@ public enum Feature { CDI, CONFIG_YAML, CONFLUENT_REGISTRY_AVRO, + CONFLUENT_REGISTRY_JSON, ELASTICSEARCH_REST_CLIENT_COMMON, ELASTICSEARCH_REST_CLIENT, ELASTICSEARCH_REST_HIGH_LEVEL_CLIENT, diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index 66b67abc53c13..fdbe228c16269 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -330,6 +330,19 @@ + + io.quarkus + quarkus-confluent-registry-json-schema + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-container-image diff --git a/docs/pom.xml b/docs/pom.xml index 07bf78b6b8df0..080a5326bfa27 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -346,6 +346,19 @@ + + io.quarkus + quarkus-confluent-registry-json-schema-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-container-image-deployment diff --git a/extensions/schema-registry/confluent/json-schema/deployment/pom.xml b/extensions/schema-registry/confluent/json-schema/deployment/pom.xml new file mode 100644 index 0000000000000..94501c8f5d7c0 --- /dev/null +++ b/extensions/schema-registry/confluent/json-schema/deployment/pom.xml @@ -0,0 +1,49 @@ + + + 4.0.0 + + + io.quarkus + quarkus-confluent-registry-json-schema-parent + 999-SNAPSHOT + + + quarkus-confluent-registry-json-schema-deployment + Quarkus - Confluent Schema Registry - Json Schema - Deployment + + + + io.quarkus + quarkus-confluent-registry-json-schema + + + + io.quarkus + quarkus-confluent-registry-common-deployment + + + io.quarkus + quarkus-schema-registry-devservice-deployment + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/schema-registry/confluent/json-schema/deployment/src/main/java/io/quarkus/confluent/registry/json/ConfluentRegistryJsonProcessor.java b/extensions/schema-registry/confluent/json-schema/deployment/src/main/java/io/quarkus/confluent/registry/json/ConfluentRegistryJsonProcessor.java new file mode 100644 index 0000000000000..0edb834b39656 --- /dev/null +++ b/extensions/schema-registry/confluent/json-schema/deployment/src/main/java/io/quarkus/confluent/registry/json/ConfluentRegistryJsonProcessor.java @@ -0,0 +1,76 @@ +package io.quarkus.confluent.registry.json; + +import java.util.Collection; +import java.util.Optional; +import java.util.function.Predicate; + +import org.jboss.logging.Logger; + +import io.quarkus.deployment.Feature; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.nativeimage.NativeImageConfigBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.deployment.pkg.builditem.CurateOutcomeBuildItem; +import io.quarkus.maven.dependency.ResolvedDependency; + +public class ConfluentRegistryJsonProcessor { + + public static final String CONFLUENT_GROUP_ID = "io.confluent"; + public static final String CONFLUENT_ARTIFACT_ID = "kafka-json-schema-serializer"; + + private static final Logger LOGGER = Logger.getLogger(ConfluentRegistryJsonProcessor.class.getName()); + public static final String CONFLUENT_REPO = "https://packages.confluent.io/maven/"; + public static final String GUIDE_URL = "https://quarkus.io/guides/kafka-schema-registry-json-schema"; + + @BuildStep + FeatureBuildItem featureAndCheckDependency(CurateOutcomeBuildItem cp) { + if (findConfluentSerde(cp.getApplicationModel().getDependencies()).isEmpty()) { + LOGGER.warnf("The application uses the `quarkus-confluent-registry-json-schema` extension, but does not " + + "depend on `%s:%s`. Note that this dependency is only available from the `%s` Maven " + + "repository. Check %s for more details.", + CONFLUENT_GROUP_ID, CONFLUENT_ARTIFACT_ID, CONFLUENT_REPO, GUIDE_URL); + } + + return new FeatureBuildItem(Feature.CONFLUENT_REGISTRY_JSON); + } + + @BuildStep + public void confluentRegistryJson(BuildProducer reflectiveClass, + BuildProducer sslNativeSupport) { + reflectiveClass + .produce(ReflectiveClassBuildItem.builder("io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer", + "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer").methods().build()); + } + + @BuildStep + public void configureNative(BuildProducer config, CurateOutcomeBuildItem cp) { + Optional serde = findConfluentSerde(cp.getApplicationModel().getDependencies()); + if (serde.isPresent()) { + String version = serde.get().getVersion(); + if (version.startsWith("7.1") || version.startsWith("7.2")) { + // Only required for Confluent Serde 7.1.x and 7.2.x + config.produce(NativeImageConfigBuildItem.builder() + .addRuntimeInitializedClass("io.confluent.kafka.schemaregistry.client.rest.utils.UrlList") + .build()); + } + } + } + + @BuildStep + ExtensionSslNativeSupportBuildItem enableSslInNative() { + return new ExtensionSslNativeSupportBuildItem(Feature.CONFLUENT_REGISTRY_JSON); + } + + private Optional findConfluentSerde(Collection dependencies) { + return dependencies.stream().filter(new Predicate() { + @Override + public boolean test(ResolvedDependency rd) { + return rd.getGroupId().equalsIgnoreCase(CONFLUENT_GROUP_ID) + && rd.getArtifactId().equalsIgnoreCase(CONFLUENT_ARTIFACT_ID); + } + }).findAny(); + } +} diff --git a/extensions/schema-registry/confluent/json-schema/pom.xml b/extensions/schema-registry/confluent/json-schema/pom.xml new file mode 100644 index 0000000000000..cdfaed3577c55 --- /dev/null +++ b/extensions/schema-registry/confluent/json-schema/pom.xml @@ -0,0 +1,21 @@ + + + + quarkus-confluent-registry-parent + io.quarkus + 999-SNAPSHOT + ../pom.xml + + + 4.0.0 + quarkus-confluent-registry-json-schema-parent + Quarkus - Confluent Schema Registry - Json Schema + pom + + + deployment + runtime + + diff --git a/extensions/schema-registry/confluent/json-schema/runtime/pom.xml b/extensions/schema-registry/confluent/json-schema/runtime/pom.xml new file mode 100644 index 0000000000000..c84a5d24fb394 --- /dev/null +++ b/extensions/schema-registry/confluent/json-schema/runtime/pom.xml @@ -0,0 +1,90 @@ + + + 4.0.0 + + + io.quarkus + quarkus-confluent-registry-json-schema-parent + 999-SNAPSHOT + + + quarkus-confluent-registry-json-schema + Quarkus - Confluent Schema Registry - Json Schema - Runtime + Use Confluent as Json Schema schema registry + + + + + io.quarkus + quarkus-confluent-registry-common + + + io.quarkus + quarkus-schema-registry-devservice + + + io.confluent + kafka-json-schema-serializer + 7.5.1 + + + org.checkerframework + checker-qual + + + commons-logging + commons-logging + + + validation-api + javax.validation + + + + + org.graalvm.sdk + graal-sdk + provided + + + + + + confluent + https://packages.confluent.io/maven/ + + false + + + + + + + + + io.quarkus + quarkus-extension-maven-plugin + + + io.quarkus.confluent.registry.json + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/schema-registry/confluent/json-schema/runtime/src/main/java/io/quarkus/confluent/registry/json/ConfluentJsonSubstitutions.java b/extensions/schema-registry/confluent/json-schema/runtime/src/main/java/io/quarkus/confluent/registry/json/ConfluentJsonSubstitutions.java new file mode 100644 index 0000000000000..96bb8fee05ae9 --- /dev/null +++ b/extensions/schema-registry/confluent/json-schema/runtime/src/main/java/io/quarkus/confluent/registry/json/ConfluentJsonSubstitutions.java @@ -0,0 +1,72 @@ +package io.quarkus.confluent.registry.json; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.oracle.svm.core.annotate.Substitute; +import com.oracle.svm.core.annotate.TargetClass; + +import io.confluent.kafka.schemaregistry.annotations.Schema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.schemaregistry.json.SpecificationVersion; + +@TargetClass(className = "io.confluent.kafka.schemaregistry.json.JsonSchemaUtils") +final class Target_io_confluent_kafka_schemaregistry_json_JsonSchemaUtils { + + @Substitute + public static JsonSchema getSchema( + Object object, + SpecificationVersion specVersion, + boolean useOneofForNullables, + boolean failUnknownProperties, + ObjectMapper objectMapper, + SchemaRegistryClient client) throws IOException { + + if (object == null) { + return null; + } + + Class cls = object.getClass(); + //We only support the scenario of having the schema defined in the annotation in the java bean, since it does not rely on outdated libraries. + if (cls.isAnnotationPresent(Schema.class)) { + Schema schema = cls.getAnnotation(Schema.class); + List references = Arrays.stream(schema.refs()) + .map(new Function() { + @Override + public SchemaReference apply( + io.confluent.kafka.schemaregistry.annotations.SchemaReference schemaReference) { + return new SchemaReference(schemaReference.name(), schemaReference.subject(), + schemaReference.version()); + } + }) + .collect(Collectors.toList()); + if (client == null) { + if (!references.isEmpty()) { + throw new IllegalArgumentException("Cannot resolve schema " + schema.value() + + " with refs " + references); + } + return new JsonSchema(schema.value()); + } else { + return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references) + .orElseThrow(new Supplier() { + @Override + public IOException get() { + return new IOException("Invalid schema " + schema.value() + + " with refs " + references); + } + }); + } + } + return null; + } +} + +class ConfluentJsonSubstitutions { +} diff --git a/extensions/schema-registry/confluent/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/schema-registry/confluent/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 0000000000000..46e9af1a164e4 --- /dev/null +++ b/extensions/schema-registry/confluent/json-schema/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,10 @@ +--- +artifact: ${project.groupId}:${project.artifactId}:${project.version} +name: "Confluent Schema Registry - Json Schema" +metadata: + keywords: + - "confluent" + - "json-schema" + categories: + - "serialization" + status: "preview" diff --git a/extensions/schema-registry/confluent/pom.xml b/extensions/schema-registry/confluent/pom.xml index 08e3f6c6262ee..f1f3fd770436f 100644 --- a/extensions/schema-registry/confluent/pom.xml +++ b/extensions/schema-registry/confluent/pom.xml @@ -15,8 +15,29 @@ Quarkus - Confluent Schema Registry pom + + + + joda-time + joda-time + 2.10.14 + + + org.jetbrains.kotlin + kotlin-scripting-compiler-embeddable + 1.6.0 + + + org.json + json + 20230227 + + + + common avro + json-schema diff --git a/integration-tests/kafka-json-schema-apicurio2/pom.xml b/integration-tests/kafka-json-schema-apicurio2/pom.xml index 88a9216d54975..7fa3b388a9ff8 100644 --- a/integration-tests/kafka-json-schema-apicurio2/pom.xml +++ b/integration-tests/kafka-json-schema-apicurio2/pom.xml @@ -13,6 +13,26 @@ Quarkus - Integration Tests - Kafka Json Schema with Apicurio 2.x The Apache Kafka Json Schema with Apicurio Registry 2.x integration tests module + + + + joda-time + joda-time + 2.10.14 + + + org.jetbrains.kotlin + kotlin-scripting-compiler-embeddable + 1.6.0 + + + org.json + json + 20230227 + + + + io.quarkus @@ -43,11 +63,16 @@ com.fasterxml.jackson.dataformat jackson-dataformat-csv - + + io.quarkus quarkus-apicurio-registry-json-schema + + io.quarkus + quarkus-confluent-registry-json-schema + @@ -149,6 +174,19 @@ + + io.quarkus + quarkus-confluent-registry-json-schema-deployment + ${project.version} + pom + test + + + * + * + + + diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java index f65ff696a15a2..31c114e1b583e 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaEndpoint.java @@ -24,6 +24,20 @@ public class JsonSchemaEndpoint { @Inject JsonSchemaKafkaCreator creator; + @GET + @Path("/confluent") + public JsonObject getConfluent() { + return get( + creator.createConfluentConsumer("test-json-schema-confluent-consumer", "test-json-schema-confluent-consumer")); + } + + @POST + @Path("/confluent") + public void sendConfluent(Pet pet) { + KafkaProducer p = creator.createConfluentProducer("test-json-schema-confluent"); + send(p, pet, "test-json-schema-confluent-producer"); + } + @GET @Path("/apicurio") public JsonObject getApicurio() { diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java index 989d2f0e10667..119beaf837785 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/JsonSchemaKafkaCreator.java @@ -17,6 +17,10 @@ import io.apicurio.registry.serde.SerdeConfig; import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer; import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.KafkaJsonDeserializerConfig; +import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; +import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; /** * Create Json Schema Kafka Consumers and Producers @@ -30,18 +34,34 @@ public class JsonSchemaKafkaCreator { @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url") String apicurioRegistryUrl; + @ConfigProperty(name = "mp.messaging.connector.smallrye-kafka.schema.registry.url") + String confluentRegistryUrl; + public JsonSchemaKafkaCreator() { } - public JsonSchemaKafkaCreator(String bootstrap, String apicurioRegistryUrl) { + public JsonSchemaKafkaCreator(String bootstrap, String apicurioRegistryUrl, String confluentRegistryUrl) { this.bootstrap = bootstrap; this.apicurioRegistryUrl = apicurioRegistryUrl; + this.confluentRegistryUrl = confluentRegistryUrl; } public String getApicurioRegistryUrl() { return apicurioRegistryUrl; } + public String getConfluentRegistryUrl() { + return confluentRegistryUrl; + } + + public KafkaConsumer createConfluentConsumer(String groupdIdConfig, String subscribtionName) { + return createConfluentConsumer(bootstrap, getConfluentRegistryUrl(), groupdIdConfig, subscribtionName); + } + + public KafkaProducer createConfluentProducer(String clientId) { + return createConfluentProducer(bootstrap, getConfluentRegistryUrl(), clientId); + } + public KafkaConsumer createApicurioConsumer(String groupdIdConfig, String subscribtionName) { return createApicurioConsumer(bootstrap, getApicurioRegistryUrl(), groupdIdConfig, subscribtionName); } @@ -50,6 +70,12 @@ public KafkaProducer createApicurioProducer(String clientId) { return createApicurioProducer(bootstrap, getApicurioRegistryUrl(), clientId); } + public static KafkaConsumer createConfluentConsumer(String bootstrap, String confluent, + String groupdIdConfig, String subscribtionName) { + Properties p = getConfluentConsumerProperties(bootstrap, confluent, groupdIdConfig); + return createConsumer(p, subscribtionName); + } + public static KafkaConsumer createApicurioConsumer(String bootstrap, String apicurio, String groupdIdConfig, String subscribtionName) { Properties p = getApicurioConsumerProperties(bootstrap, apicurio, groupdIdConfig); @@ -62,6 +88,12 @@ public static KafkaProducer createApicurioProducer(String bootstra return createProducer(p); } + public static KafkaProducer createConfluentProducer(String bootstrap, String confluent, + String clientId) { + Properties p = getConfluentProducerProperties(bootstrap, confluent, clientId); + return createProducer(p); + } + private static KafkaConsumer createConsumer(Properties props, String subscribtionName) { if (!props.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)) { props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); @@ -78,6 +110,15 @@ private static KafkaProducer createProducer(Properties props) { return new KafkaProducer<>(props); } + private static Properties getConfluentConsumerProperties(String bootstrap, String confluent, + String groupdIdConfig) { + Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class.getName()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent); + props.put(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, Pet.class.getName()); + return props; + } + public static Properties getApicurioConsumerProperties(String bootstrap, String apicurio, String groupdIdConfig) { Properties props = getGenericConsumerProperties(bootstrap, groupdIdConfig); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class.getName()); @@ -107,6 +148,13 @@ private static Properties getApicurioProducerProperties(String bootstrap, String return props; } + private static Properties getConfluentProducerProperties(String bootstrap, String confluent, String clientId) { + Properties props = getGenericProducerProperties(bootstrap, clientId); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class.getName()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluent); + return props; + } + private static Properties getGenericProducerProperties(String bootstrap, String clientId) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); diff --git a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java index ee47fb2fe9482..fd53166cae9b7 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/main/java/io/quarkus/it/kafka/jsonschema/Pet.java @@ -1,5 +1,25 @@ package io.quarkus.it.kafka.jsonschema; +import io.confluent.kafka.schemaregistry.annotations.Schema; + +//This class is used by both serializers, but for it to be usable by the Confluent serializer the schema must be attached here in the annotation +@Schema(value = """ + { + "$id": "https://example.com/person.schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Pet", + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "The pet's name." + }, + "color": { + "type": "string", + "description": "The pet's color." + } + } + }""", refs = {}) public class Pet { private String name; diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java index 796540becc0a7..729b8956fd47e 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaJsonSchemaTestBase.java @@ -17,6 +17,8 @@ public abstract class KafkaJsonSchemaTestBase { static final String APICURIO_PATH = "/json-schema/apicurio"; + static final String CONFLUENT_PATH = "/json-schema/confluent"; + abstract JsonSchemaKafkaCreator creator(); @Test @@ -41,6 +43,20 @@ public void testApicurioJsonSchemaConsumer() { testJsonSchemaConsumer(producer, APICURIO_PATH, topic); } + @Test + public void testConfluentJsonSchemaProducer() { + KafkaConsumer consumer = creator().createConfluentConsumer( + "test-json-schema-confluent", + "test-json-schema-confluent-producer"); + testJsonSchemaProducer(consumer, CONFLUENT_PATH); + } + + @Test + public void testConfluentJsonSchemaConsumer() { + KafkaProducer producer = creator().createConfluentProducer("test-json-schema-confluent-test"); + testJsonSchemaConsumer(producer, CONFLUENT_PATH, "test-json-schema-confluent-consumer"); + } + protected void testJsonSchemaProducer(KafkaConsumer consumer, String path) { RestAssured.given() .header("content-type", "application/json") diff --git a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java index dabe27a7715ed..652fdc47b9641 100644 --- a/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java +++ b/integration-tests/kafka-json-schema-apicurio2/src/test/java/io/quarkus/it/kafka/KafkaResource.java @@ -17,7 +17,8 @@ public void setIntegrationTestContext(DevServicesContext context) { String bootstrapServers = devServicesProperties.get("kafka.bootstrap.servers"); if (bootstrapServers != null) { String apicurioUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.apicurio.registry.url"); - creator = new JsonSchemaKafkaCreator(bootstrapServers, apicurioUrl); + String confluentUrl = devServicesProperties.get("mp.messaging.connector.smallrye-kafka.schema.registry.url"); + creator = new JsonSchemaKafkaCreator(bootstrapServers, apicurioUrl, confluentUrl); } }