From f53668b35a8975674e08912c3f89a27b5087f755 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Mon, 12 Apr 2021 19:28:32 +0200 Subject: [PATCH] Kafka, Schema Registry and Avro guide --- .../asciidoc/kafka-schema-registry-avro.adoc | 552 ++++++++++++++++++ 1 file changed, 552 insertions(+) create mode 100644 docs/src/main/asciidoc/kafka-schema-registry-avro.adoc diff --git a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc new file mode 100644 index 00000000000000..3668e098dc6173 --- /dev/null +++ b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc @@ -0,0 +1,552 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc +//// += Quarkus - Using Apache Kafka with Schema Registry and Avro + +include::./attributes.adoc[] + +This guide demonstrates how your Quarkus application can use Schema Registry with Avro. + +The guide is based on the link:https://quarkus.io/blog/kafka-avro/[How to Use Kafka, Schema Registry and Avro with Quarkus] blog post. +If you are not familiar with Avro or the concept of Schema Registry, the post has a good introduction on the subject. + +If you are not familiar with Kafka and Kafka in Quarkus in particular, consider +first going through the link:kafka.adoc[Using Apache Kafka with Reactive Messaging] guide. + +== Prerequisites + +To complete this guide, you need: + +* less than 30 minutes +* an IDE +* JDK 1.8+ installed with `JAVA_HOME` configured appropriately +* Apache Maven {maven-version} +* Docker Compose to start a local Kafka cluster and Apicurio Registry +* GraalVM installed if you want to run in native mode. + + +== Architecture + +In this guide we are going to implement a REST resource, namely `MovieResource` that +will consume movie DTOs and put them in a Kafka topic. + +Then, we will implement a consumer that will consume and collect messages from the same topic. +The collected messages will be then exposed by another resource, `ConsumedMovieResource`, via +Server Sent Events. + +== Solution +We recommend that you follow the instructions in the next sections and create the application step by step. +However, you can go right to the completed example. + +Clone the Git repository: `git clone {quickstarts-clone-url}`, or download an {quickstarts-archive-url}[archive]. + +The solution is located in the `kafka-avro-schema-quickstart` {quickstarts-tree-url}/kafka-quickstart-avro-schema[directory]. + +== Creating the Maven Project + +First, we need a new project. Create a new project with the following command: + +[source,bash,subs=attributes+] +---- +mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ + -DprojectGroupId=org.acme \ + -DprojectArtifactId=kafka-avro-schema-quickstart \ + -DclassName="org.acme.kafka.MovieResource" \ + -Dpath="/movies" \ + -Dextensions="resteasy-reactive,resteasy-reactive-jackson,rest-client,smallrye-reactive-messaging-kafka,avro" +cd kafka-avro-schema-quickstart +---- + +Please note that even thought our application will not explicitly use a REST client, it is used +under the hood by the Apicurio registry and, at the moment it is required as a dependency. + +Additionally, we need a serializer and a deserializer for Avro, specific to the schema registry we are going to use. +Apicurio, which we use in this guide, has them in the following dependency: + +[source,xml] +---- + + io.apicurio + apicurio-registry-utils-serde + 1.2.2.Final + + + org.jboss.spec.javax.interceptor + jboss-interceptors-api_1.2_spec + + + +---- + + +== Avro schema +Let's create a file called `src/main/avro/movie.avsc` with the schema for our record (Kafka message): +[source,javascript] +---- +{ + "namespace": "org.acme.kafka.quarkus", + "type": "record", + "name": "Movie", + "fields": [ + { + "name": "title", + "type": "string" + }, + { + "name": "year", + "type": "int" + } + ] +} +---- + +If you build the project with `mvn compile` , the `movies.avsc` will get compiled to a `Movie.java` file +placed in the `target/generated-sources/avsc` directory. + +If you run the project with `mvn compile quarkus:dev`, the changes you do to the schema file will be +automatically applied to the generated Java files. + +== The `Movie` producer +Having defined the record, we can now jump to implementing the `MovieResource`. + +Let's open the `MovieResource`, inject an `Emitter` of `Movie` DTO and implement a `@POST` method +that consumes `Movie` and sends it through the `Emitter`: + +[source,java] +---- +package org.acme.kafka; + +import org.acme.kafka.quarkus.Movie; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.jboss.logging.Logger; + +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/movies") +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MediaType.APPLICATION_JSON) +public class MovieResource { + + private static final Logger LOGGER = Logger.getLogger(MovieResource.class); + + @Channel("movies") Emitter emitter; + + @POST + public Response enqueueMovie(Movie movie) { + LOGGER.infof("Sending movie %s to Kafka", + movie.getTitle() + ); + emitter.send(movie); + return Response.accepted().build(); + } + +} +---- + +Now, configure the channel for the `Emitter` in `application.properties`: +[source,properties] +---- +# set the URL of the Apicurio Schema Registry, a global setting shared between producers and consumers +mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api + +# set the connector to use for the `movies` channel to smallrye-kafka +mp.messaging.outgoing.movies.connector=smallrye-kafka + +# the name of the corresponding Kafka topic to `movies` +mp.messaging.outgoing.movies.topic=movies + +# set the serializer for the `movies` channel to the Avro Serializer for Apicurio +mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer + +# Apicurio schema specific settings: +mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy +mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy +mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +---- + +== The `Movie` consumer +We already have means to put `Movie` records to a Kafka topic, it's time to implement a consumer for them. + +Let's create `ConsumedMovieResource` that will consume `Movie` messages +from the `movies-from-kafka` channel and will expose it via Server Sent Events: + +[source,java] +---- +package org.acme.kafka; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.acme.kafka.quarkus.Movie; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.jboss.resteasy.annotations.SseElementType; + +import io.smallrye.mutiny.Multi; + +@ApplicationScoped +@Path("/consumed-movies") +public class ConsumedMovieResource { + + @Inject + @Channel("movies-from-kafka") + Multi movies; + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @SseElementType("text/plain") + public Multi stream() { + return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear())); + } +} +---- + +The last bit of the application's code is the configurations of the `movies-from-kafka` channel in +`application.properties`: +[source,properties] +---- +# set the connector for the incoming channel to `smallrye-kafka` +mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka + +# set the topic name for the channel to `movies` +mp.messaging.incoming.movies-from-kafka.topic=movies + +# set the deserializer for the `movies-from-kafka` channel to the Avro Deserializer for Apicurio +mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer + +# disable auto-commit, Reactive Messaging handles it itself +mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false + +mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest +mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider +---- + + +== The infrastructure +To use our application, we need Kafka and Apicurio Schema Registry. +The easiest way to get them running is to use `docker-compose` to start the appropriate containers. + +Create a `docker-compose.yaml` file at the root of the project with the following content: + +[source,yaml] +---- +version: '2' + +services: + + zookeeper: + image: strimzi/kafka:0.11.3-kafka-2.1.0 + command: [ + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" + ] + ports: + - "2181:2181" + environment: + LOG_DIR: /tmp/logs + + kafka: + image: strimzi/kafka:0.11.3-kafka-2.1.0 + command: [ + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" + ] + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + LOG_DIR: "/tmp/logs" + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + + schema-registry: + image: apicurio/apicurio-registry-mem:1.2.2.Final + ports: + - 8081:8080 + depends_on: + - kafka + environment: + QUARKUS_PROFILE: prod + KAFKA_BOOTSTRAP_SERVERS: localhost:9092 + APPLICATION_ID: registry_id + APPLICATION_SERVER: localhost:9000 +---- + +== Running the application +Let's first start the Schema Registry and Kafka containers: +[source,shell script] +---- +docker-compose up +---- +NOTE: To stop the containers, use `docker-compose down`. You can also clean up +the containers with `docker-compose rm` + +Then, start the application: +[source,shell script] +---- +mvn compile quarkus:dev +---- + + +In the second terminal, query the `ConsumedMovieResource` resource with `curl`: +[source,shell script] +---- +curl -N http://localhost:8080/consumed-movies +---- + +In the third one, post a few movies: +[source,shell script] +---- +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Shawshank Redemption","year":1994}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Godfather","year":1972}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"The Dark Knight","year":2008}' \ + http://localhost:8080/movies + +curl --header "Content-Type: application/json" \ + --request POST \ + --data '{"title":"12 Angry Men","year":1957}' \ + http://localhost:8080/movies +---- + +Observe what is printed in the second terminal. You should see something along the lines of: +[source] +---- +data:'The Shawshank Redemption' from 1994 + +data:'The Godfather' from 1972 + +data:'The Dark Knight' from 2008 + +data:'12 Angry Men' from 1957 +---- + + +== Testing the application + +=== Infrastructure for tests +We will now use Testcontainers to set up Kafka and Apicurio Schema Registry for tests. + +First, let's add test dependencies on Awaitility, Testcontainers and Strimzi to `pom.xml`: +[source,xml] +---- + + ... + + org.testcontainers + testcontainers + test + + + io.strimzi + strimzi-test-container + 0.22.1 + test + + + org.apache.logging.log4j + log4j-core + + + + + org.awaitility + awaitility + test + + +---- + +Now, let's define a link:getting-started-testing.adoc#quarkus-test-resource[QuarkusTestResourceLifecycleManager] that will create the appropriate +containers: +[source,java] +---- +package org.acme.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.testcontainers.containers.GenericContainer; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import io.strimzi.StrimziKafkaContainer; + +public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager { + + private final StrimziKafkaContainer kafka = new StrimziKafkaContainer(); + + private GenericContainer registry; + + @Override + public Map start() { + kafka.start(); + registry = new GenericContainer<>("apicurio/apicurio-registry-mem:1.2.2.Final") + .withExposedPorts(8080) + .withEnv("QUARKUS_PROFILE", "prod") + .withEnv("KAFKA_BOOTSTRAP_SERVERS", kafka.getBootstrapServers()) + .withEnv("APPLICATION_ID", "registry_id") + .withEnv("APPLICATION_SERVER", "localhost:9000"); + registry.start(); + Map properties = new HashMap<>(); + properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url", + "http://" + registry.getContainerIpAddress() + ":" + registry.getMappedPort(8080) + "/api"); + properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers()); + return properties; + } + + @Override + public void stop() { + registry.stop(); + kafka.close(); + } +} +---- + +=== The test +In the test, we will send movies in a loop and check if the `ConsumedMovieResource` returns +what we send. + +[source,java] +---- +package org.acme.kafka; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.ContentType; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.sse.SseEventSource; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static io.restassured.RestAssured.given; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.equalToIgnoringCase; + +@QuarkusTest +// register the class that sets up Testcontainers: +@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class) +public class MovieResourceTest { + + @TestHTTPResource("/consumed-movies") + URI consumedMovies; + + @Test + public void testHelloEndpoint() { + // create a client for `ConsumedMovieResource` and collect the consumed resources in a list + Client client = ClientBuilder.newClient(); + WebTarget target = client.target(consumedMovies); + + List received = new CopyOnWriteArrayList<>(); + + SseEventSource source = SseEventSource.target(target).build(); + source.register(inboundSseEvent -> received.add(inboundSseEvent.readData())); + + // in a separate thread, feed the `MovieResource` + ExecutorService movieSender = startSendingMovies(); + + source.open(); + + // check if, after at most 5 seconds, we have at last 2 items collected, and they are what we expect: + await().atMost(5000, MILLISECONDS).until(() -> received.size() >= 2); + assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994", + "'12 Angry Men' from 1957")); + source.close(); + + // shutdown the executor that is feeding the `MovieResource` + movieSender.shutdown(); + } + + private ExecutorService startSendingMovies() { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService + .execute( + () -> { + while (true) { + given() + .contentType(ContentType.JSON) + .body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}") + .when().post("/movies") + .then() + .statusCode(202); + given() + .contentType(ContentType.JSON) + .body("{\"title\":\"12 Angry Men\",\"year\":1957}") + .when().post("/movies") + .then() + .statusCode(202); + try { + Thread.sleep(200L); + } catch (InterruptedException e) { + break; + } + } + } + ); + return executorService; + } + +} +---- + +== Avro code generation details +In this guide we used the Quarkus code generation mechanism to generate Java files +from Avro schema. + +Under the hoods, the mechanism uses `org.apache.avro:avro-compiler`. + +You can use the following configuration properties to alter how it works: + +- `avro.codegen.[avsc|avdl|avpr].imports` - a list of files or directories that should be compiled first thus making them +importable by subsequently compiled schemas. Note that imported files should not reference each other. All paths should be relative to the src/[main|test]/avro directory. Passed as a comma-separated list. +- `avro.codegen.stringType` - the Java type to use for Avro strings. May be one of `CharSequence`, `String` or +`Utf8`. Defaults to `String` +- `avro.codegen.createOptionalGetters` - enables generating the `getOptional...` +methods that return an Optional of the requested type. Defaults to `false` +- `avro.codegen.enableDecimalLogicalType` - determines whether to use Java classes for decimal types, defaults to `false` +- `avro.codegen.createSetters` - determines whether to create setters for the fields of the record. +Defaults to `false` +- `avro.codegen.gettersReturnOptional` - enables generating `get...` methods that +return an Optional of the requested type. Defaults to `false` +- `avro.codegen.optionalGettersForNullableFieldsOnly`, works in conjunction with `gettersReturnOptional` option. +If it is set, `Optional` getters will be generated only for fields that are nullable. If the field is mandatory, +regular getter will be generated. Defaults to `false` + +== Further reading +This guide used SmallRye Reactive Messaging Kafka to produce Kafka messages and consume messages from Kafka, to learn more about +this project, take a look at link:https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2/kafka/kafka.html[its guide]. \ No newline at end of file