Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Confluent Schema Registry + restructure schema registry extensions #24526

Merged
merged 1 commit into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,16 @@
<artifactId>quarkus-avro-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-common-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
Expand All @@ -1218,6 +1228,36 @@
<artifactId>quarkus-apicurio-registry-avro-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-common-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-schema-registry-devservice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-schema-registry-devservice-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
Expand Down Expand Up @@ -3091,6 +3131,11 @@
<artifactId>agroal-pool</artifactId>
<version>${agroal.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>${apicurio-registry.version}</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,7 @@ public interface Capability {

String APICURIO_REGISTRY = QUARKUS_PREFIX + "apicurio.registry";
String APICURIO_REGISTRY_AVRO = APICURIO_REGISTRY + ".avro";

String CONFLUENT_REGISTRY = QUARKUS_PREFIX + "confluent.registry";
String CONFLUENT_REGISTRY_AVRO = CONFLUENT_REGISTRY + ".avro";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public enum Feature {
CACHE,
CDI,
CONFIG_YAML,
CONFLUENT_REGISTRY_AVRO,
ELASTICSEARCH_REST_CLIENT_COMMON,
ELASTICSEARCH_REST_CLIENT,
ELASTICSEARCH_REST_HIGH_LEVEL_CLIENT,
Expand Down
13 changes: 13 additions & 0 deletions devtools/bom-descriptor-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-container-image-deployment</artifactId>
Expand Down
28 changes: 23 additions & 5 deletions docs/src/main/asciidoc/apicurio-registry-dev-services.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,43 @@ https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc

include::./attributes.adoc[]

If the `quarkus-apicurio-registry-avro` extension is present, Dev Services for Apicurio Registry automatically starts an Apicurio Registry instance in dev mode and when running tests.
If an extension for schema registry, such as `quarkus-apicurio-registry-avro` or `quarkus-confluent-registry-avro`, is present, Dev Services for Apicurio Registry automatically starts an Apicurio Registry instance in dev mode and when running tests.
Also, all Kafka channels in SmallRye Reactive Messaging are automatically configured to use this registry.
(This automatic configuration of course only applies to serializers and deserializers from the Apicurio Registry Avro library.)
This automatic configuration only applies to serializers and deserializers from Apicurio Registry serde libraries and Confluent Schema Registry serde libraries, because there properties are set:

[source,properties]
----
# for Apicurio Registry serde
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
# for Confluent Schema Registry serde
mp.messaging.connector.smallrye-kafka.schema.registry.url=http://localhost:8081/apis/ccompat/v6
----


== Enabling / Disabling Dev Services for Apicurio Registry

Dev Services for Apicurio Registry is automatically enabled unless:

- `quarkus.apicurio-registry.devservices.enabled` is set to `false`
- `mp.messaging.connector.smallrye-kafka.apicurio.registry.url` is configured
- all the Reactive Messaging Kafka channels have the `apicurio.registry.url` attribute set
- `mp.messaging.connector.smallrye-kafka.schema.registry.url` is configured
- all the Reactive Messaging Kafka channels have either the `apicurio.registry.url` attribute or the `schema.registry.url` attribute set

Dev Services for Apicurio Registry relies on Docker to start the registry.
If your environment does not support Docker, you will need to start the registry manually, or use an already running registry.
You can configure the registry URL for all Kafka channels in SmallRye Reactive Messaging with a single property:
In such case, you can configure the registry URL for all Kafka channels in SmallRye Reactive Messaging with a single property.
For Apicurio Registry serde, that is:

[source,properties]
----
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=... your Apicuio Registry URL...
----

For Confluent Schema Registry serde, that is:

[source,properties]
----
mp.messaging.connector.smallrye-kafka.schema.registry.url=... your Confluent Schema Registry URL...
----

== Shared registry
Expand Down
18 changes: 9 additions & 9 deletions docs/src/main/asciidoc/kafka-schema-registry-avro.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc
include::./attributes.adoc[]

This guide shows how your Quarkus application can use Apache Kafka, http://avro.apache.org/docs/current/[Avro] serialized
records, and connect to a schema registry (such as the https://docs.confluent.io/platform/current/schema-registry/index.html[Confluent Schema Registry] or https://www.apicur.io/registry/[Apicurio Registry].
records, and connect to a schema registry (such as the https://docs.confluent.io/platform/current/schema-registry/index.html[Confluent Schema Registry] or https://www.apicur.io/registry/[Apicurio Registry]).

If you are not familiar with Kafka and Kafka in Quarkus in particular, consider
first going through the xref:kafka.adoc[Using Apache Kafka with Reactive Messaging] guide.
Expand Down Expand Up @@ -52,19 +52,17 @@ include::includes/devtools/create-app.adoc[]
[TIP]
====
If you use Confluent Schema Registry, you don't need the `quarkus-apicurio-registry-avro` extension.
Instead, you need the following dependencies and the Confluent Maven repository added
to your build file:
Instead, you need the `quarkus-confluent-registry-avro` extension and a few more dependencies.
Also, you need to add the Confluent Maven repository to your build file:

[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"]
.pom.xml
----
<dependencies>
...
<!-- Quarkus extension for generating Java code from Avro schemas -->
<!-- with this, you don't have to use avro-maven-plugin -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-avro</artifactId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- Confluent registry libraries use JAX-RS client -->
<dependency>
Expand Down Expand Up @@ -109,8 +107,7 @@ repositories {
dependencies {
...

// Quarkus extension for generating Java code from Avro schemas
implementation("io.quarkus:quarkus-avro")
implementation("io.quarkus:quarkus-confluent-registry-avro")

// Confluent registry libraries use JAX-RS client
implementation("io.quarkus:quarkus-rest-client-reactive")
Expand Down Expand Up @@ -312,7 +309,10 @@ See xref:kafka-dev-services.adoc[Dev Services for Kafka] and xref:apicurio-regis
You might have noticed that we didn't configure the schema registry URL anywhere.
This is because Dev Services for Apicurio Registry configures all Kafka channels in SmallRye Reactive Messaging to use the automatically started registry instance.

There's no Dev Services support for Confluent Schema Registry.
Apicurio Registry, in addition to its native API, also exposes an endpoint that is API-compatible with Confluent Schema Registry.
Therefore, this automatic configuration works both for Apicurio Registry serde and Confluent Schema Registry serde.

However, note that there's no Dev Services support for running Confluent Schema Registry itself.
If you want to use a running instance of Confluent Schema Registry, configure its URL, together with the URL of a Kafka broker:

[source,properties]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,51 +337,10 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
// Avro - for both Confluent and Apicurio

// --- Confluent ---
if (QuarkusClassLoader.isClassPresentAtRuntime("io.confluent.kafka.serializers.KafkaAvroDeserializer")) {
reflectiveClass
.produce(new ReflectiveClassBuildItem(true, false,
"io.confluent.kafka.serializers.KafkaAvroDeserializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer"));

reflectiveClass
.produce(new ReflectiveClassBuildItem(true, false, false,
"io.confluent.kafka.serializers.context.NullContextNameStrategy"));

reflectiveClass
.produce(new ReflectiveClassBuildItem(true, true, false,
"io.confluent.kafka.serializers.subject.TopicNameStrategy",
"io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy"));

reflectiveClass
.produce(new ReflectiveClassBuildItem(true, true, false,
"io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage",
"io.confluent.kafka.schemaregistry.client.rest.entities.Schema",
"io.confluent.kafka.schemaregistry.client.rest.entities.Config",
"io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference",
"io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString",
"io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTypeConverter",
"io.confluent.kafka.schemaregistry.client.rest.entities.ServerClusterId",
"io.confluent.kafka.schemaregistry.client.rest.entities.SujectVersion"));

reflectiveClass
.produce(new ReflectiveClassBuildItem(true, true, false,
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.CompatibilityCheckResponse",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.ModeGetResponse",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.ModeUpdateRequest",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest",
"io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse"));
}

if (QuarkusClassLoader.isClassPresentAtRuntime(
"io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider")) {
serviceProviders
.produce(new ServiceProviderBuildItem(
"io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider",
"io.confluent.kafka.schemaregistry.client.security.basicauth.SaslBasicAuthCredentialProvider",
"io.confluent.kafka.schemaregistry.client.security.basicauth.UrlBasicAuthCredentialProvider",
"io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider"));
if (QuarkusClassLoader.isClassPresentAtRuntime("io.confluent.kafka.serializers.KafkaAvroDeserializer")
&& !capabilities.isPresent(Capability.CONFLUENT_REGISTRY_AVRO)) {
throw new RuntimeException(
"Confluent Avro classes detected, please use the quarkus-confluent-registry-avro extension");
}

// --- Apicurio Registry 1.x ---
Expand Down
2 changes: 1 addition & 1 deletion extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
<module>kafka-streams</module>
<module>mongodb-client</module>
<module>avro</module>
<module>apicurio-registry-avro</module>
<module>schema-registry</module>
<module>devservices</module>

<!-- Spring -->
Expand Down
49 changes: 49 additions & 0 deletions extensions/schema-registry/apicurio/avro/deployment/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro-parent</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-apicurio-registry-avro-deployment</artifactId>
<name>Quarkus - Apicurio Registry - Avro - Deployment</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-avro-deployment</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.quarkus.apicurio.registry.avro;

import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;

public class ApicurioRegistryAvroProcessor {
@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.APICURIO_REGISTRY_AVRO);
}

@BuildStep
public void apicurioRegistryAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport) {

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, false,
"io.apicurio.registry.serde.avro.AvroKafkaDeserializer",
"io.apicurio.registry.serde.avro.AvroKafkaSerializer"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"io.apicurio.registry.serde.strategy.SimpleTopicIdStrategy",
"io.apicurio.registry.serde.strategy.TopicIdStrategy",
"io.apicurio.registry.serde.avro.DefaultAvroDatumProvider",
"io.apicurio.registry.serde.avro.ReflectAvroDatumProvider",
"io.apicurio.registry.serde.avro.strategy.RecordIdStrategy",
"io.apicurio.registry.serde.avro.strategy.TopicRecordIdStrategy"));

reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true,
"io.apicurio.registry.serde.DefaultSchemaResolver",
"io.apicurio.registry.serde.DefaultIdHandler",
"io.apicurio.registry.serde.Legacy4ByteIdHandler",
"io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider",
"io.apicurio.registry.serde.headers.DefaultHeadersHandler"));
}

@BuildStep
ExtensionSslNativeSupportBuildItem enableSslInNative() {
return new ExtensionSslNativeSupportBuildItem(Feature.APICURIO_REGISTRY_AVRO);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>quarkus-extensions-parent</artifactId>
<artifactId>quarkus-apicurio-registry-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
Expand Down
Loading