Skip to content

Commit

Permalink
Add support for Confluent Schema Registry + restructure schema regist…
Browse files Browse the repository at this point in the history
…ry extensions
  • Loading branch information
alesj authored and Ladicek committed Apr 22, 2022
1 parent b57ef3d commit 07efcb0
Showing 51 changed files with 1,085 additions and 316 deletions.
45 changes: 45 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
@@ -1208,6 +1208,16 @@
<artifactId>quarkus-avro-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-common-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
@@ -1218,6 +1228,36 @@
<artifactId>quarkus-apicurio-registry-avro-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-common-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-schema-registry-devservice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-schema-registry-devservice-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
@@ -3091,6 +3131,11 @@
<artifactId>agroal-pool</artifactId>
<version>${agroal.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>${apicurio-registry.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
Original file line number Diff line number Diff line change
@@ -121,4 +121,7 @@ public interface Capability {

String APICURIO_REGISTRY = QUARKUS_PREFIX + "apicurio.registry";
String APICURIO_REGISTRY_AVRO = APICURIO_REGISTRY + ".avro";

String CONFLUENT_REGISTRY = QUARKUS_PREFIX + "confluent.registry";
String CONFLUENT_REGISTRY_AVRO = CONFLUENT_REGISTRY + ".avro";
}
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ public enum Feature {
CACHE,
CDI,
CONFIG_YAML,
CONFLUENT_REGISTRY_AVRO,
ELASTICSEARCH_REST_CLIENT_COMMON,
ELASTICSEARCH_REST_CLIENT,
ELASTICSEARCH_REST_HIGH_LEVEL_CLIENT,
13 changes: 13 additions & 0 deletions devtools/bom-descriptor-json/pom.xml
Original file line number Diff line number Diff line change
@@ -292,6 +292,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image</artifactId>
13 changes: 13 additions & 0 deletions docs/pom.xml
Original file line number Diff line number Diff line change
@@ -252,6 +252,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-deployment</artifactId>
28 changes: 23 additions & 5 deletions docs/src/main/asciidoc/apicurio-registry-dev-services.adoc
Original file line number Diff line number Diff line change
@@ -7,25 +7,43 @@ https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc

include::./attributes.adoc[]

If the `quarkus-apicurio-registry-avro` extension is present, Dev Services for Apicurio Registry automatically starts an Apicurio Registry instance in dev mode and when running tests.
If an extension for schema registry, such as `quarkus-apicurio-registry-avro` or `quarkus-confluent-registry-avro`, is present, Dev Services for Apicurio Registry automatically starts an Apicurio Registry instance in dev mode and when running tests.
Also, all Kafka channels in SmallRye Reactive Messaging are automatically configured to use this registry.
(This automatic configuration of course only applies to serializers and deserializers from the Apicurio Registry Avro library.)
This automatic configuration only applies to serializers and deserializers from Apicurio Registry serde libraries and Confluent Schema Registry serde libraries, because there properties are set:

[source,properties]
----
# for Apicurio Registry serde
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
# for Confluent Schema Registry serde
mp.messaging.connector.smallrye-kafka.schema.registry.url=http://localhost:8081/apis/ccompat/v6
----


== Enabling / Disabling Dev Services for Apicurio Registry

Dev Services for Apicurio Registry is automatically enabled unless:

- `quarkus.apicurio-registry.devservices.enabled` is set to `false`
- `mp.messaging.connector.smallrye-kafka.apicurio.registry.url` is configured
- all the Reactive Messaging Kafka channels have the `apicurio.registry.url` attribute set
- `mp.messaging.connector.smallrye-kafka.schema.registry.url` is configured
- all the Reactive Messaging Kafka channels have either the `apicurio.registry.url` attribute or the `schema.registry.url` attribute set

Dev Services for Apicurio Registry relies on Docker to start the registry.
If your environment does not support Docker, you will need to start the registry manually, or use an already running registry.
You can configure the registry URL for all Kafka channels in SmallRye Reactive Messaging with a single property:
In such case, you can configure the registry URL for all Kafka channels in SmallRye Reactive Messaging with a single property.
For Apicurio Registry serde, that is:

[source,properties]
----
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=... your Apicuio Registry URL...
----

For Confluent Schema Registry serde, that is:

[source,properties]
----
mp.messaging.connector.smallrye-kafka.schema.registry.url=... your Confluent Schema Registry URL...
----

== Shared registry
18 changes: 9 additions & 9 deletions docs/src/main/asciidoc/kafka-schema-registry-avro.adoc
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc
include::./attributes.adoc[]

This guide shows how your Quarkus application can use Apache Kafka, http://avro.apache.org/docs/current/[Avro] serialized
records, and connect to a schema registry (such as the https://docs.confluent.io/platform/current/schema-registry/index.html[Confluent Schema Registry] or https://www.apicur.io/registry/[Apicurio Registry].
records, and connect to a schema registry (such as the https://docs.confluent.io/platform/current/schema-registry/index.html[Confluent Schema Registry] or https://www.apicur.io/registry/[Apicurio Registry]).

If you are not familiar with Kafka and Kafka in Quarkus in particular, consider
first going through the xref:kafka.adoc[Using Apache Kafka with Reactive Messaging] guide.
@@ -52,19 +52,17 @@ include::includes/devtools/create-app.adoc[]
[TIP]
====
If you use Confluent Schema Registry, you don't need the `quarkus-apicurio-registry-avro` extension.
Instead, you need the following dependencies and the Confluent Maven repository added
to your build file:
Instead, you need the `quarkus-confluent-registry-avro` extension and a few more dependencies.
Also, you need to add the Confluent Maven repository to your build file:
[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"]
.pom.xml
----
<dependencies>
...
<!-- Quarkus extension for generating Java code from Avro schemas -->
<!-- with this, you don't have to use avro-maven-plugin -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-avro</artifactId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- Confluent registry libraries use JAX-RS client -->
<dependency>
@@ -109,8 +107,7 @@ repositories {
dependencies {
...
// Quarkus extension for generating Java code from Avro schemas
implementation("io.quarkus:quarkus-avro")
implementation("io.quarkus:quarkus-confluent-registry-avro")
// Confluent registry libraries use JAX-RS client
implementation("io.quarkus:quarkus-rest-client-reactive")
@@ -312,7 +309,10 @@ See xref:kafka-dev-services.adoc[Dev Services for Kafka] and xref:apicurio-regis
You might have noticed that we didn't configure the schema registry URL anywhere.
This is because Dev Services for Apicurio Registry configures all Kafka channels in SmallRye Reactive Messaging to use the automatically started registry instance.
There's no Dev Services support for Confluent Schema Registry.
Apicurio Registry, in addition to its native API, also exposes an endpoint that is API-compatible with Confluent Schema Registry.
Therefore, this automatic configuration works both for Apicurio Registry serde and Confluent Schema Registry serde.
However, note that there's no Dev Services support for running Confluent Schema Registry itself.
If you want to use a running instance of Confluent Schema Registry, configure its URL, together with the URL of a Kafka broker:
[source,properties]
Original file line number Diff line number Diff line change
@@ -337,51 +337,10 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
// Avro - for both Confluent and Apicurio

// --- Confluent ---
if (QuarkusClassLoader.isClassPresentAtRuntime("io.confluent.kafka.serializers.KafkaAvroDeserializer")) {
reflectiveClass
.produce(new ReflectiveClassBuildItem(true, false,
"io.confluent.kafka.serializers.KafkaAvroDeserializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer"));

reflectiveClass
.produce(new ReflectiveClassBuildItem(true, false, false,
"io.confluent.kafka.serializers.context.NullContextNameStrategy"));

reflectiveClass
.produce(new ReflectiveClassBuildItem(true, true, false,
"io.confluent.kafka.serializers.subject.TopicNameStrategy",
"io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy"));

reflectiveClass
.produce(new ReflectiveClassBuildItem(true, true, false,
"io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage",
"io.confluent.kafka.schemaregistry.client.rest.entities.Schema",
"io.confluent.kafka.schemaregistry.client.rest.entities.Config",
"io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference",
"io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString",
"io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTypeConverter",
"io.confluent.kafka.schemaregistry.client.rest.entities.ServerClusterId",
"io.confluent.kafka.schemaregistry.client.rest.entities.SujectVersion"));

reflectiveClass
.produce(new ReflectiveClassBuildItem(true, true, false,
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.CompatibilityCheckResponse",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.ModeGetResponse",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.ModeUpdateRequest",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse"));
}

if (QuarkusClassLoader.isClassPresentAtRuntime(
"io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider")) {
serviceProviders
.produce(new ServiceProviderBuildItem(
"io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider",
"io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider",
"io.confluent.kafka.schemaregistry.client.security.basicauth.UrlBasicAuthCredentialProvider",
"io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider"));
if (QuarkusClassLoader.isClassPresentAtRuntime("io.confluent.kafka.serializers.KafkaAvroDeserializer")
&& !capabilities.isPresent(Capability.CONFLUENT_REGISTRY_AVRO)) {
throw new RuntimeException(
"Confluent Avro classes detected, please use the quarkus-confluent-registry-avro extension");
}

// --- Apicurio Registry 1.x ---
2 changes: 1 addition & 1 deletion extensions/pom.xml
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@
<module>kafka-streams</module>
<module>mongodb-client</module>
<module>avro</module>
<module>apicurio-registry-avro</module>
<module>schema-registry</module>
<module>devservices</module>

<!-- Spring -->
49 changes: 49 additions & 0 deletions extensions/schema-registry/apicurio/avro/deployment/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro-parent</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-apicurio-registry-avro-deployment</artifactId>
<name>Quarkus - Apicurio Registry - Avro - Deployment</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-avro-deployment</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.quarkus.apicurio.registry.avro;

import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;

public class ApicurioRegistryAvroProcessor {
@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.APICURIO_REGISTRY_AVRO);
}

@BuildStep
public void apicurioRegistryAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport) {

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.serde.avro.AvroKafkaDeserializer",
"io.apicurio.registry.serde.avro.AvroKafkaSerializer"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy",
"io.apicurio.registry.serde.strategy.TopicIdStrategy",
"io.apicurio.registry.serde.avro.DefaultAvroDatumProvider",
"io.apicurio.registry.serde.avro.ReflectAvroDatumProvider",
"io.apicurio.registry.serde.avro.strategy.RecordIdStrategy",
"io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"io.apicurio.registry.serde.DefaultSchemaResolver",
"io.apicurio.registry.serde.DefaultIdHandler",
"io.apicurio.registry.serde.Legacy4ByteIdHandler",
"io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider",
"io.apicurio.registry.serde.headers.DefaultHeadersHandler"));
}

@BuildStep
ExtensionSslNativeSupportBuildItem enableSslInNative() {
return new ExtensionSslNativeSupportBuildItem(Feature.APICURIO_REGISTRY_AVRO);
}

}
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>quarkus-extensions-parent</artifactId>
<artifactId>quarkus-apicurio-registry-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
Loading

0 comments on commit 07efcb0

Please sign in to comment.