From e24115b0822cc34fae6e23d677ec697b3c142ea6 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Wed, 20 Feb 2019 21:48:58 +1100 Subject: [PATCH] Add Kafka extension --- bom/pom.xml | 13 +++ build-parent/pom.xml | 69 +++++++++++++++ .../deployment}/pom.xml | 13 +-- .../reactivemessaging/KafkaProcessor.java | 87 +++++++++++++++++++ extensions/kafka-client/pom.xml | 21 +++++ extensions/kafka-client/runtime/pom.xml | 50 +++++++++++ .../kafka/graal/SubstituteSnappy.java | 0 extensions/pom.xml | 2 + .../KafkaCodecProcessor.java | 54 ------------ .../deployment/pom.xml | 60 +++++++++++++ .../KafkaCodecProcessor.java | 35 ++++++++ .../smallrye-reactive-messaging-kafka/pom.xml | 21 +++++ .../runtime}/pom.xml | 31 +++++-- .../smallrye-reactive-messaging/pom.xml | 3 - integration-tests/main/pom.xml | 21 +++++ .../example/kafka/KafkaConsumerManager.java | 43 +++++++++ .../shamrock/example/kafka/KafkaEndpoint.java | 26 ++++++ .../example/kafka/KafkaProducerManager.java | 40 +++++++++ .../example/test/KafkaConsumerITCase.java | 10 +++ .../example/test/KafkaConsumerTest.java | 40 +++++++++ .../example/test/KafkaProducerITCase.java | 10 +++ .../example/test/KafkaProducerTest.java | 45 ++++++++++ .../example/test/KafkaTestResource.java | 37 ++++++++ 23 files changed, 659 insertions(+), 72 deletions(-) rename extensions/{reactive-messaging/kafka-connector-deployment => kafka-client/deployment}/pom.xml (75%) create mode 100644 extensions/kafka-client/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaProcessor.java create mode 100644 extensions/kafka-client/pom.xml create mode 100644 extensions/kafka-client/runtime/pom.xml rename extensions/{reactive-messaging/kafka-connector-runtime => kafka-client/runtime}/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java (100%) delete mode 100644 extensions/reactive-messaging/kafka-connector-deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java create mode 100644 extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml create mode 100644 extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java create mode 100644 extensions/smallrye-reactive-messaging-kafka/pom.xml rename extensions/{reactive-messaging/kafka-connector-runtime => smallrye-reactive-messaging-kafka/runtime}/pom.xml (75%) create mode 100644 integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaConsumerManager.java create mode 100644 integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaEndpoint.java create mode 100644 integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaProducerManager.java create mode 100644 integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerITCase.java create mode 100644 integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerTest.java create mode 100644 integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerITCase.java create mode 100644 integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerTest.java create mode 100644 integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaTestResource.java diff --git a/bom/pom.xml b/bom/pom.xml index 3d5878d58bd9c..573d146fc3ab0 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -67,6 +67,13 @@ provided + + org.jboss.shamrock + shamrock-kafka-client-deployment + ${project.version} + provided + + org.jboss.shamrock shamrock-smallrye-health-deployment @@ -101,6 +108,12 @@ ${project.version} provided + + org.jboss.shamrock + shamrock-smallrye-reactive-messaging-kafka-deployment + ${project.version} + provided + org.jboss.shamrock diff --git a/build-parent/pom.xml b/build-parent/pom.xml index 74f55fe35b14e..ac5ba7d975198 100644 --- a/build-parent/pom.xml +++ b/build-parent/pom.xml @@ -125,6 +125,10 @@ 1.10.6 3.3.2.Final 0.0.1 + 1.1.0 + 1.1.0 + 0.8.3.Final + 3.4.10 @@ -229,6 +233,16 @@ shamrock-jdbc-mariadb-runtime ${project.version} + + org.jboss.shamrock + shamrock-kafka-client-deployment + ${project.version} + + + org.jboss.shamrock + shamrock-kafka-client-runtime + ${project.version} + org.jboss.shamrock shamrock-smallrye-health-deployment @@ -513,6 +527,16 @@ shamrock-smallrye-reactive-messaging-runtime ${project.version} + + org.jboss.shamrock + shamrock-smallrye-reactive-messaging-kafka-deployment + ${project.version} + + + org.jboss.shamrock + shamrock-smallrye-reactive-messaging-kafka-runtime + ${project.version} + org.jboss.shamrock shamrock-spring-di-deployment @@ -618,6 +642,11 @@ jackson-databind ${jackson.version} + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + com.sun.activation jakarta.activation @@ -707,6 +736,30 @@ + + + io.debezium + debezium-core + ${debezium.version} + test + + + io.debezium + debezium-core + ${debezium.version} + test-jar + test + + + org.scala-lang + scala-reflect + 2.12.2 + + + org.scala-lang + scala-library + 2.12.2 + io.undertow undertow-servlet @@ -858,6 +911,22 @@ httpcore ${httpcore.version} + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + + + org.apache.kafka + kafka_2.12 + ${kafka2.version} + test + + + org.apache.zookeeper + zookeeper + ${zookeeper.version} + org.apache.logging.log4j log4j-api diff --git a/extensions/reactive-messaging/kafka-connector-deployment/pom.xml b/extensions/kafka-client/deployment/pom.xml similarity index 75% rename from extensions/reactive-messaging/kafka-connector-deployment/pom.xml rename to extensions/kafka-client/deployment/pom.xml index ecc22266a4f41..88b7309443362 100644 --- a/extensions/reactive-messaging/kafka-connector-deployment/pom.xml +++ b/extensions/kafka-client/deployment/pom.xml @@ -6,11 +6,11 @@ org.jboss.shamrock - shamrock-reactive-messaging + shamrock-kafka-client 1.0.0.Alpha1-SNAPSHOT - shamrock-reactive-messaging-kafka-connector-deployment + shamrock-kafka-client-deployment @@ -19,15 +19,8 @@ org.jboss.shamrock - shamrock-arc-deployment + shamrock-kafka-client-runtime - - org.jboss.shamrock - shamrock-reactive-messaging-kafka-connector-runtime - - ${project.version} - - org.jboss.shamrock shamrock-junit5-internal diff --git a/extensions/kafka-client/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaProcessor.java new file mode 100644 index 0000000000000..6ace8940bce5d --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaProcessor.java @@ -0,0 +1,87 @@ +package org.jboss.shamrock.reactivemessaging; + +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.ByteBufferDeserializer; +import org.apache.kafka.common.serialization.ByteBufferSerializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.FloatSerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.ShortDeserializer; +import org.apache.kafka.common.serialization.ShortSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.DotName; +import org.jboss.shamrock.deployment.annotations.BuildProducer; +import org.jboss.shamrock.deployment.annotations.BuildStep; +import org.jboss.shamrock.deployment.builditem.CombinedIndexBuildItem; +import org.jboss.shamrock.deployment.builditem.substrate.ReflectiveClassBuildItem; + +import java.util.Collection; + +public class KafkaProcessor { + + static final Class[] BUILT_INS = { + //serializers + ShortSerializer.class, + DoubleSerializer.class, + LongSerializer.class, + BytesSerializer.class, + ByteArraySerializer.class, + IntegerSerializer.class, + ByteBufferSerializer.class, + StringSerializer.class, + FloatSerializer.class, + + //deserializers + ShortDeserializer.class, + DoubleDeserializer.class, + LongDeserializer.class, + BytesDeserializer.class, + ByteArrayDeserializer.class, + IntegerDeserializer.class, + ByteBufferDeserializer.class, + StringDeserializer.class, + FloatDeserializer.class, + }; + + @BuildStep + public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer reflectiveClass) { + Collection serializers = indexBuildItem.getIndex() + .getAllKnownSubclasses(DotName.createSimple(Serializer.class.getName())); + Collection deserializers = indexBuildItem.getIndex() + .getAllKnownSubclasses(DotName.createSimple(Deserializer.class.getName())); + + for(Class i : BUILT_INS) { + reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, i.getName())); + } + + for (ClassInfo s : serializers) { + reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString())); + } + + for (ClassInfo s : deserializers) { + reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString())); + } + + // Also + // Kafka has is heavily using reflection - at least these 2 classes are instantiated + // The first to produce + // The second to consume + reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, DefaultPartitioner.class.getName())); + reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, RangeAssignor.class.getName())); + + } +} diff --git a/extensions/kafka-client/pom.xml b/extensions/kafka-client/pom.xml new file mode 100644 index 0000000000000..d2f68a3e78563 --- /dev/null +++ b/extensions/kafka-client/pom.xml @@ -0,0 +1,21 @@ + + + + shamrock-build-parent + org.jboss.shamrock + 1.0.0.Alpha1-SNAPSHOT + ../../build-parent/pom.xml + + + 4.0.0 + shamrock-kafka-client + Shamrock - Kafka - Client + pom + + + deployment + runtime + + diff --git a/extensions/kafka-client/runtime/pom.xml b/extensions/kafka-client/runtime/pom.xml new file mode 100644 index 0000000000000..81583f78f4a1e --- /dev/null +++ b/extensions/kafka-client/runtime/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + + org.jboss.shamrock + shamrock-kafka-client + 1.0.0.Alpha1-SNAPSHOT + + + shamrock-kafka-client-runtime + + + + + org.apache.kafka + kafka-clients + + + + com.oracle.substratevm + svm + + + + + + + + maven-dependency-plugin + + + maven-compiler-plugin + + + + org.jboss.shamrock + shamrock-extension-processor + ${project.version} + + + + + + + + + \ No newline at end of file diff --git a/extensions/reactive-messaging/kafka-connector-runtime/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java b/extensions/kafka-client/runtime/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java similarity index 100% rename from extensions/reactive-messaging/kafka-connector-runtime/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java rename to extensions/kafka-client/runtime/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java diff --git a/extensions/pom.xml b/extensions/pom.xml index b15386e7f97af..9ac628f9b6660 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -57,6 +57,7 @@ netty reactive-streams-operators smallrye-reactive-messaging + smallrye-reactive-messaging-kafka narayana-jta @@ -65,6 +66,7 @@ hibernate-orm hibernate-validator panache + kafka-client spring-di diff --git a/extensions/reactive-messaging/kafka-connector-deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java b/extensions/reactive-messaging/kafka-connector-deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java deleted file mode 100644 index 8cffeff3ee269..0000000000000 --- a/extensions/reactive-messaging/kafka-connector-deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.jboss.shamrock.reactivemessaging; - -import io.vertx.kafka.client.serialization.JsonObjectDeserializer; -import io.vertx.kafka.client.serialization.JsonObjectSerializer; -import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.jboss.jandex.ClassInfo; -import org.jboss.jandex.DotName; -import org.jboss.shamrock.deployment.annotations.BuildProducer; -import org.jboss.shamrock.deployment.annotations.BuildStep; -import org.jboss.shamrock.deployment.builditem.CombinedIndexBuildItem; -import org.jboss.shamrock.deployment.builditem.substrate.ReflectiveClassBuildItem; - -import java.util.Collection; - -public class KafkaCodecProcessor { - - @BuildStep - public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer reflectiveClass) { - Collection serializers = indexBuildItem.getIndex() - .getAllKnownSubclasses(DotName.createSimple(Serializer.class.getName())); - Collection deserializers = indexBuildItem.getIndex() - .getAllKnownSubclasses(DotName.createSimple(Deserializer.class.getName())); - - // TODO For some reason it does not seem to work: - - for (ClassInfo s : serializers) { - reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString())); - } - - for (ClassInfo s : deserializers) { - reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString())); - } - - // Explicit list them because of issue mentioned above ^ - reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, StringSerializer.class.getName())); - reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, JsonObjectSerializer.class.getName())); - reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, StringDeserializer.class.getName())); - reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, JsonObjectDeserializer.class.getName())); - - - // Also - // Kafka has is heavily using reflection - at least these 2 classes are instantiated - // The first to produce - // The second to consume - reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, DefaultPartitioner.class.getName())); - reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, RangeAssignor.class.getName())); - - } -} diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml b/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml new file mode 100644 index 0000000000000..74275b4b017cc --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml @@ -0,0 +1,60 @@ + + + 4.0.0 + + + org.jboss.shamrock + shamrock-smallrye-reactive-messaging-kafka + 1.0.0.Alpha1-SNAPSHOT + + + shamrock-smallrye-reactive-messaging-kafka-deployment + + + + org.jboss.shamrock + shamrock-core-deployment + + + org.jboss.shamrock + shamrock-arc-deployment + + + org.jboss.shamrock + shamrock-kafka-client-deployment + + + org.jboss.shamrock + shamrock-smallrye-reactive-messaging-kafka-runtime + + + org.jboss.shamrock + shamrock-vertx-deployment + + + + org.jboss.shamrock + shamrock-junit5-internal + + + + + + + maven-compiler-plugin + + + + org.jboss.shamrock + shamrock-extension-processor + ${project.version} + + + + + + + + \ No newline at end of file diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java new file mode 100644 index 0000000000000..8add59c3a33a6 --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java @@ -0,0 +1,35 @@ +package org.jboss.shamrock.reactivemessaging; + +import org.jboss.shamrock.deployment.annotations.BuildProducer; +import org.jboss.shamrock.deployment.annotations.BuildStep; +import org.jboss.shamrock.deployment.builditem.CombinedIndexBuildItem; +import org.jboss.shamrock.deployment.builditem.substrate.ReflectiveClassBuildItem; + +import io.vertx.kafka.client.serialization.BufferDeserializer; +import io.vertx.kafka.client.serialization.BufferSerializer; +import io.vertx.kafka.client.serialization.JsonArrayDeserializer; +import io.vertx.kafka.client.serialization.JsonArraySerializer; +import io.vertx.kafka.client.serialization.JsonObjectDeserializer; +import io.vertx.kafka.client.serialization.JsonObjectSerializer; + +public class KafkaCodecProcessor { + + static final Class[] BUILT_INS = { + JsonObjectSerializer.class, + BufferSerializer.class, + JsonArraySerializer.class, + + + JsonObjectDeserializer.class, + BufferDeserializer.class, + JsonArrayDeserializer.class, + }; + + @BuildStep + public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer reflectiveClass) { + + for (Class s : BUILT_INS) { + reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString())); + } + } +} diff --git a/extensions/smallrye-reactive-messaging-kafka/pom.xml b/extensions/smallrye-reactive-messaging-kafka/pom.xml new file mode 100644 index 0000000000000..711717e1c549c --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/pom.xml @@ -0,0 +1,21 @@ + + + + shamrock-build-parent + org.jboss.shamrock + 1.0.0.Alpha1-SNAPSHOT + ../../build-parent/pom.xml + + + 4.0.0 + shamrock-smallrye-reactive-messaging-kafka + Shamrock - SmallRye Reactive Messaging - Kafka + pom + + + deployment + runtime + + diff --git a/extensions/reactive-messaging/kafka-connector-runtime/pom.xml b/extensions/smallrye-reactive-messaging-kafka/runtime/pom.xml similarity index 75% rename from extensions/reactive-messaging/kafka-connector-runtime/pom.xml rename to extensions/smallrye-reactive-messaging-kafka/runtime/pom.xml index 7181a77112b2d..7d59ea8cc8f44 100644 --- a/extensions/reactive-messaging/kafka-connector-runtime/pom.xml +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/pom.xml @@ -6,13 +6,17 @@ org.jboss.shamrock - shamrock-reactive-messaging + shamrock-smallrye-reactive-messaging-kafka 1.0.0.Alpha1-SNAPSHOT - shamrock-reactive-messaging-kafka-connector-runtime + shamrock-smallrye-reactive-messaging-kafka-runtime + + org.jboss.shamrock + shamrock-kafka-client-runtime + io.smallrye.reactive smallrye-reactive-messaging-kafka @@ -44,7 +48,6 @@ org.apache.kafka kafka-clients - 1.1.0 @@ -74,10 +77,9 @@ - org.jboss.shamrock - shamrock-reactive-streams-operators-runtime + shamrock-smallrye-reactive-streams-operators-runtime io.smallrye.reactive @@ -96,6 +98,25 @@ + + + + maven-dependency-plugin + + + maven-compiler-plugin + + + + org.jboss.shamrock + shamrock-extension-processor + ${project.version} + + + + + + \ No newline at end of file diff --git a/extensions/smallrye-reactive-messaging/pom.xml b/extensions/smallrye-reactive-messaging/pom.xml index 9aec40003078e..8331e2ecb9c5a 100644 --- a/extensions/smallrye-reactive-messaging/pom.xml +++ b/extensions/smallrye-reactive-messaging/pom.xml @@ -17,8 +17,5 @@ deployment runtime - - kafka-connector-deployment - kafka-connector-runtime diff --git a/integration-tests/main/pom.xml b/integration-tests/main/pom.xml index ba9be05cc8a58..7d9319d9aba74 100644 --- a/integration-tests/main/pom.xml +++ b/integration-tests/main/pom.xml @@ -154,6 +154,11 @@ shamrock-integration-test-common-jpa-entities ${project.version} + + org.jboss.shamrock + shamrock-kafka-client-deployment + provided + @@ -171,6 +176,22 @@ rest-assured test + + io.debezium + debezium-core + test + + + io.debezium + debezium-core + test-jar + test + + + org.apache.kafka + kafka_2.12 + test + diff --git a/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaConsumerManager.java b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaConsumerManager.java new file mode 100644 index 0000000000000..9cedd92dd6b18 --- /dev/null +++ b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaConsumerManager.java @@ -0,0 +1,43 @@ +package org.jboss.shamrock.example.kafka; + +import java.util.Collections; +import java.util.Properties; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; + +@ApplicationScoped +public class KafkaConsumerManager { + + public static KafkaConsumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("test-consumer")); + return consumer; + } + + private int count; + private Consumer consumer; + + @PostConstruct + public void create() { + consumer = createConsumer(); + } + + public String receive() { + return consumer.poll(10000).iterator().next().value(); + } + +} diff --git a/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaEndpoint.java b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaEndpoint.java new file mode 100644 index 0000000000000..7b7ea6e18c805 --- /dev/null +++ b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaEndpoint.java @@ -0,0 +1,26 @@ +package org.jboss.shamrock.example.kafka; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; + +@Path("/kafka") +public class KafkaEndpoint { + + @Inject + KafkaProducerManager manager; + + @Inject + KafkaConsumerManager consumer; + + @POST + public void post(String message) { + manager.send(message); + } + + @GET + public String get() { + return consumer.receive(); + } +} diff --git a/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaProducerManager.java b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaProducerManager.java new file mode 100644 index 0000000000000..82dddcb7834ce --- /dev/null +++ b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaProducerManager.java @@ -0,0 +1,40 @@ +package org.jboss.shamrock.example.kafka; + +import java.util.Properties; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +@ApplicationScoped +public class KafkaProducerManager { + + public static Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "test"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer(props); + } + + private int count; + private Producer producer; + + @PostConstruct + public void create() { + producer = createProducer(); + } + + public void send(String message) { + producer.send(new ProducerRecord<>("test", count++, message)); + producer.flush(); + } + +} diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerITCase.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerITCase.java new file mode 100644 index 0000000000000..a52efc9956b0d --- /dev/null +++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerITCase.java @@ -0,0 +1,10 @@ +package org.jboss.shamrock.example.test; + +import org.jboss.shamrock.test.junit.SubstrateTest; + +@SubstrateTest +public class KafkaConsumerITCase extends KafkaConsumerTest { + + + +} diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerTest.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerTest.java new file mode 100644 index 0000000000000..d6bd1b346ae70 --- /dev/null +++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerTest.java @@ -0,0 +1,40 @@ +package org.jboss.shamrock.example.test; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.hamcrest.Matchers; +import org.jboss.shamrock.test.common.ShamrockTestResource; +import org.jboss.shamrock.test.junit.ShamrockTest; +import org.junit.jupiter.api.Test; + +import io.restassured.RestAssured; + +@ShamrockTestResource(KafkaTestResource.class) +@ShamrockTest +public class KafkaConsumerTest { + + + public static Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-consumer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer(props); + } + + @Test + public void test() throws Exception { + Producer consumer = createProducer(); + consumer.send(new ProducerRecord<>("test-consumer", 1, "hi world")); + RestAssured.when().get("/kafka").then().body(Matchers.is("hi world")); + } + + +} diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerITCase.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerITCase.java new file mode 100644 index 0000000000000..a68a64bc2dca7 --- /dev/null +++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerITCase.java @@ -0,0 +1,10 @@ +package org.jboss.shamrock.example.test; + +import org.jboss.shamrock.test.junit.SubstrateTest; + +@SubstrateTest +public class KafkaProducerITCase extends KafkaProducerTest { + + + +} diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerTest.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerTest.java new file mode 100644 index 0000000000000..8f5da6ff2b3a7 --- /dev/null +++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerTest.java @@ -0,0 +1,45 @@ +package org.jboss.shamrock.example.test; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.jboss.shamrock.test.junit.ShamrockTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.restassured.RestAssured; + +@ShamrockTest +public class KafkaProducerTest { + + + public static KafkaConsumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("test")); + return consumer; + } + + + @Test + public void test() throws Exception { + KafkaConsumer consumer = createConsumer(); + RestAssured.with().body("hello").post("/kafka"); + ConsumerRecord records = consumer.poll(10000).iterator().next(); + Assertions.assertEquals(records.key(), (Integer)0); + Assertions.assertEquals(records.value(), "hello"); + } + + +} diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaTestResource.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaTestResource.java new file mode 100644 index 0000000000000..5b32b81fdc3c3 --- /dev/null +++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaTestResource.java @@ -0,0 +1,37 @@ +package org.jboss.shamrock.example.test; + +import java.io.File; +import java.util.Properties; + +import org.jboss.shamrock.test.common.ShamrockTestResourceLifecycleManager; + +import io.debezium.kafka.KafkaCluster; +import io.debezium.util.Testing; + +public class KafkaTestResource implements ShamrockTestResourceLifecycleManager { + + private KafkaCluster kafka; + + @Override + public void start() { + try { + Properties props = new Properties(); + props.setProperty("zookeeper.connection.timeout.ms", "10000"); + File directory = Testing.Files.createTestingDirectory("kafka-data", true); + kafka = new KafkaCluster().withPorts(2182, 19092) + .addBrokers(1) + .usingDirectory(directory) + .deleteDataUponShutdown(true) + .withKafkaConfiguration(props) + .deleteDataPriorToStartup(true) + .startup(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void stop() { + kafka.shutdown(); + } +}