+= Quarkus - Using Apache Kafka with Schema Registry and Avro
+This guide shows how your Quarkus application can use Apache Kafka, http://avro.apache.org/docs/current/[Avro] serialized
+records, and connect to Schema Registry (such as the https://docs.confluent.io/platform/current/schema-registry/index.html[Confluent Schema Registry] or https://www.apicur.io/registry/[Apicurio Schema Registry].
+If you are not familiar with Kafka and Kafka in Quarkus in particular, consider
+first going through the link:kafka.adoc[Using Apache Kafka with Reactive Messaging] guide.
+== Prerequisites
+To complete this guide, you need:
+* less than 30 minutes
+* an IDE
+* JDK 11+ installed with `JAVA_HOME` configured appropriately
+* Apache Maven {maven-version}
+* Docker Compose to start a local Kafka cluster and Apicurio Registry
+* GraalVM installed if you want to run in native mode.
+== Architecture
+In this guide we are going to implement a REST resource, namely `MovieResource` that
+will consume movie DTOs and put them in a Kafka topic.
+Then, we will implement a consumer that will consume and collect messages from the same topic.
+The collected messages will be then exposed by another resource, `ConsumedMovieResource`, via
+https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events[Server-Sent Events].
+The _Movies_ will be serialized and deserialized using Avro.
+The schema, describing the _Movie_, is stored in an Apicurio schema registry.
+The same concept applies if you are using the Confluent Avro _serde_ and Confluent Schema Registry.
+== Solution
+We recommend that you follow the instructions in the next sections and create the application step by step.
+However, you can go right to the completed example.
+Clone the Git repository: `git clone {quickstarts-clone-url}`, or download an {quickstarts-archive-url}[archive].
+The solution is located in the `kafka-avro-schema-quickstart` {quickstarts-tree-url}/kafka-quickstart-avro-schema[directory].
+== Creating the Maven Project
+First, we need a new project. Create a new project with the following command:
+mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \
+ -DprojectGroupId=org.acme \
+ -DprojectArtifactId=kafka-avro-schema-quickstart \
+ -DclassName="org.acme.kafka.MovieResource" \
+ -Dpath="/movies" \
+ -Dextensions="resteasy-reactive,resteasy-reactive-jackson,rest-client,smallrye-reactive-messaging-kafka,avro"
+cd kafka-avro-schema-quickstart
+NOTE: Even though our application will not use a REST client directly, it is used under the hood by
+the Apicurio serializer (to interact with the registry) and, at the moment it is required as a dependency.
+Additionally, we need a serializer and deserializer for Avro.
+In this guide, we will use the ones provided by Apicurio.
+ io.apicurio
+ apicurio-registry-utils-serde
+ 1.2.2.Final
+ org.jboss.spec.javax.interceptor
+ jboss-interceptors-api_1.2_spec
+If you use Confluent Schema Registry, you need the following dependencies and the confluent repository added
+to your `pom.xml`:
+ ...
+ com.google.guava
+ guava
+ 30.1.1-jre
+ org.checkerframework
+ checker-qual
+ io.confluent
+ kafka-avro-serializer
+ 5.5.0
+ io.swagger
+ swagger-core
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+ confluent
+ https://packages.confluent.io/maven/
+ false
+== Avro schema
+Apache Avro is a data serialization system. Data structures are described using schemas.
+The first thing we need to do is to create a schema describing the `Movie` structure.
+Create a file called `src/main/avro/movie.avsc` with the schema for our record (Kafka message):
+ "namespace": "org.acme.kafka.quarkus",
+ "type": "record",
+ "name": "Movie",
+ "fields": [
+ {
+ "name": "title",
+ "type": "string"
+ },
+ {
+ "name": "year",
+ "type": "int"
+ }
+ ]
+If you build the project with `mvn compile` , the `movies.avsc` will get compiled to a `Movie.java` file
+placed in the `target/generated-sources/avsc` directory.
+Take a look at the https://avro.apache.org/docs/current/spec.html#schemas[Avro specification] to learn more about
+the Avro syntax and supported types.
+TIP: With Quarkus, no need to a specific plugin to process the Avro schema, this is all done for you!
+If you run the project with `mvn compile quarkus:dev`, the changes you do to the schema file will be
+automatically applied to the generated Java files.
+== The `Movie` producer
+Having defined the schema, we can now jump to implementing the `MovieResource`.
+Let's open the `MovieResource`, inject an Let's open the `MovieResource`, inject an https://quarkus.io/blog/reactive-messaging-emitter/[`Emitter`] of `Movie` DTO and implement a `@POST` method of `Movie` DTO and implement a `@POST` method
+that consumes `Movie` and sends it through the `Emitter`:
+package org.acme.kafka;
+import org.acme.kafka.quarkus.Movie;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+import org.jboss.logging.Logger;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+public class MovieResource {
+ private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
+ @Channel("movies") Emitter emitter;
+ public Response enqueueMovie(Movie movie) {
+ LOGGER.infof("Sending movie %s to Kafka",
+ movie.getTitle()
+ );
+ emitter.send(movie);
+ return Response.accepted().build();
+ }
+Now, we need to _map_ the `movies` channel (the `Emitter` emits to this channel) to a Kafka topic.
+To achieve this, edit the `application.properties` file, and add the following content:
+# set the URL of the Apicurio Schema Registry, a global setting shared between producers and consumers
+# set the connector to use for the `movies` channel to smallrye-kafka
+# the name of the corresponding Kafka topic to `movies`
+# set the serializer for the `movies` channel to the Avro Serializer for Apicurio
+# Apicurio schema specific settings:
+== The `Movie` consumer
+So, we can write records into Kafka containing our `Movie` data.
+That data is serialized using Avro.
+Now, it's time to implement a consumer for them.
+Let's create `ConsumedMovieResource` that will consume `Movie` messages
+from the `movies-from-kafka` channel and will expose it via Server-Sent Events:
+package org.acme.kafka;
+import javax.enterprise.context.ApplicationScoped;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import org.acme.kafka.quarkus.Movie;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.jboss.resteasy.annotations.SseElementType;
+import io.smallrye.mutiny.Multi;
+public class ConsumedMovieResource {
+ @Channel("movies-from-kafka")
+ Multi movies;
+ @GET
+ @Produces(MediaType.SERVER_SENT_EVENTS)
+ @SseElementType(MediaType.TEXT_PLAIN)
+ public Multi stream() {
+ return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
+ }
+The last bit of the application's code is the configuration of the `movies-from-kafka` channel in
+# set the connector for the incoming channel to `smallrye-kafka`
+# set the topic name for the channel to `movies`
+# set the deserializer for the `movies-from-kafka` channel to the Avro Deserializer for Apicurio
+# disable auto-commit, Reactive Messaging handles it itself
+== The infrastructure
+To use our application, we need Kafka and Apicurio Schema Registry.
+The easiest way to get them running is to use `docker-compose` to start the appropriate containers.
+Create a `docker-compose.yaml` file at the root of the project with the following content:
+version: '2'
+ zookeeper:
+ image: strimzi/kafka:0.20.1-kafka-2.5.0
+ command: [
+ "sh", "-c",
+ "bin/zookeeper-server-start.sh config/zookeeper.properties"
+ ]
+ ports:
+ - "2181:2181"
+ environment:
+ LOG_DIR: /tmp/logs
+ kafka:
+ image: strimzi/kafka:0.20.1-kafka-2.5.0
+ command: [
+ "sh", "-c",
+ "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
+ ]
+ depends_on:
+ - zookeeper
+ ports:
+ - "9092:9092"
+ environment:
+ LOG_DIR: "/tmp/logs"
+ schema-registry:
+ image: apicurio/apicurio-registry-mem:1.3.2.Final
+ ports:
+ - 8081:8080
+ depends_on:
+ - kafka
+ environment:
+ APPLICATION_ID: registry_id
+ APPLICATION_SERVER: localhost:9000
+== Running the application
+Let's first start the Schema Registry and Kafka containers:
+[source,shell script]
+docker-compose up
+NOTE: To stop the containers, use `docker-compose down`. You can also clean up
+the containers with `docker-compose rm`
+Then, start the application:
+[source,shell script]
+mvn compile quarkus:dev
+In the second terminal, query the `ConsumedMovieResource` resource with `curl`:
+[source,shell script]
+curl -N http://localhost:8080/consumed-movies
+In the third one, post a few movies:
+[source,shell script]
+curl --header "Content-Type: application/json" \
+ --request POST \
+ --data '{"title":"The Shawshank Redemption","year":1994}' \
+ http://localhost:8080/movies
+curl --header "Content-Type: application/json" \
+ --request POST \
+ --data '{"title":"The Godfather","year":1972}' \
+ http://localhost:8080/movies
+curl --header "Content-Type: application/json" \
+ --request POST \
+ --data '{"title":"The Dark Knight","year":2008}' \
+ http://localhost:8080/movies
+curl --header "Content-Type: application/json" \
+ --request POST \
+ --data '{"title":"12 Angry Men","year":1957}' \
+ http://localhost:8080/movies
+Observe what is printed in the second terminal. You should see something along the lines of:
+data:'The Shawshank Redemption' from 1994
+data:'The Godfather' from 1972
+data:'The Dark Knight' from 2008
+data:'12 Angry Men' from 1957
+== Building a native executable
+Building a native executable
+You can build a native executable with the usual command ./mvnw package -Dnative.
+Running it is as simple as executing `./target/kafka-avro-schema-quickstart-1.0.0-SNAPSHOT-runner`.
+== Testing the application
+=== Infrastructure for tests
+We will now use Testcontainers to set up Kafka and Apicurio Schema Registry for tests.
+First, let's add test dependencies on Awaitility and Strimzi to `pom.xml`, Testcontainers will be pulled
+in transitively by `strimzi-test-container`:
+ ...
+ io.strimzi
+ strimzi-test-container
+ 0.22.1
+ test
+ org.apache.logging.log4j
+ log4j-core
+ org.awaitility
+ awaitility
+ test
+Now, let's define a link:getting-started-testing.adoc#quarkus-test-resource[QuarkusTestResourceLifecycleManager] that will create the appropriate
+package org.acme.kafka;
+import java.util.HashMap;
+import java.util.Map;
+import org.testcontainers.containers.GenericContainer;
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import io.strimzi.StrimziKafkaContainer;
+public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
+ private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
+ private GenericContainer> registry;
+ @Override
+ public Map start() {
+ kafka.start();
+ registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final")
+ .withExposedPorts(8080)
+ .withEnv("QUARKUS_PROFILE", "prod")
+ .withEnv("KAFKA_BOOTSTRAP_SERVERS", kafka.getBootstrapServers())
+ .withEnv("APPLICATION_ID", "registry_id")
+ .withEnv("APPLICATION_SERVER", "localhost:9000");
+ registry.start();
+ Map properties = new HashMap<>();
+ properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
+ "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api");
+ properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
+ return properties;
+ }
+ @Override
+ public void stop() {
+ registry.stop();
+ kafka.stop();
+ }
+=== The test
+In the test, we will send movies in a loop and check if the `ConsumedMovieResource` returns
+what we send.
+package org.acme.kafka;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.common.http.TestHTTPResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.http.ContentType;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.sse.SseEventSource;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import static io.restassured.RestAssured.given;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.equalToIgnoringCase;
+// register the class that sets up Testcontainers:
+public class MovieResourceTest {
+ @TestHTTPResource("/consumed-movies")
+ URI consumedMovies;
+ @Test
+ public void testHelloEndpoint() {
+ // create a client for `ConsumedMovieResource` and collect the consumed resources in a list
+ Client client = ClientBuilder.newClient();
+ WebTarget target = client.target(consumedMovies);
+ List received = new CopyOnWriteArrayList<>();
+ SseEventSource source = SseEventSource.target(target).build();
+ source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
+ // in a separate thread, feed the `MovieResource`
+ ExecutorService movieSender = startSendingMovies();
+ source.open();
+ // check if, after at most 5 seconds, we have at last 2 items collected, and they are what we expect:
+ await().atMost(5000, MILLISECONDS).until(() -> received.size() >= 2);
+ assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
+ "'12 Angry Men' from 1957"));
+ source.close();
+ // shutdown the executor that is feeding the `MovieResource`
+ movieSender.shutdown();
+ }
+ private ExecutorService startSendingMovies() {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService
+ .execute(
+ () -> {
+ while (true) {
+ given()
+ .contentType(ContentType.JSON)
+ .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
+ .when().post("/movies")
+ .then()
+ .statusCode(202);
+ given()
+ .contentType(ContentType.JSON)
+ .body("{\"title\":\"12 Angry Men\",\"year\":1957}")
+ .when().post("/movies")
+ .then()
+ .statusCode(202);
+ try {
+ Thread.sleep(200L);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ );
+ return executorService;
+ }
+NOTE: We modified the `MovieResourceTest` that was generated together with the project. This test class has a
+subclass, `NativeMovieResourceIT`, that runs the same test against the native executable.
+To run it, execute `mvn package verify -Dnative`, or `mvn clean install -Dnative`
+== Avro code generation details
+In this guide we used the Quarkus code generation mechanism to generate Java files
+from Avro schema.
+Under the hood, the mechanism uses `org.apache.avro:avro-compiler`.
+You can use the following configuration properties to alter how it works:
+- `avro.codegen.[avsc|avdl|avpr].imports` - a list of files or directories that should be compiled first thus making them
+importable by subsequently compiled schemas. Note that imported files should not reference each other. All paths should be relative
+to the `src/[main|test]/avro` directory. Passed as a comma-separated list.
+- `avro.codegen.stringType` - the Java type to use for Avro strings. May be one of `CharSequence`, `String` or
+`Utf8`. Defaults to `String`
+- `avro.codegen.createOptionalGetters` - enables generating the `getOptional...`
+methods that return an Optional of the requested type. Defaults to `false`
+- `avro.codegen.enableDecimalLogicalType` - determines whether to use Java classes for decimal types, defaults to `false`
+- `avro.codegen.createSetters` - determines whether to create setters for the fields of the record.
+Defaults to `false`
+- `avro.codegen.gettersReturnOptional` - enables generating `get...` methods that
+return an Optional of the requested type. Defaults to `false`
+- `avro.codegen.optionalGettersForNullableFieldsOnly`, works in conjunction with `gettersReturnOptional` option.
+If it is set, `Optional` getters will be generated only for fields that are nullable. If the field is mandatory,
+regular getter will be generated. Defaults to `false`
+== Further reading
+* link:https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.9/kafka/kafka.html[SmallRye Reactive Messaging Kafka] documentation
+* link:https://quarkus.io/blog/kafka-avro/[How to Use Kafka, Schema Registry and Avro with Quarkus] - a blog post on which
+the guide is based. It gives a good introduction to Avro and the concept of Schema Registry
\ No newline at end of file