diff --git a/bom/pom.xml b/bom/pom.xml
index 3d5878d58bd9cd..573d146fc3ab05 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -67,6 +67,13 @@
provided
+
+ org.jboss.shamrock
+ shamrock-kafka-client-deployment
+ ${project.version}
+ provided
+
+
org.jboss.shamrock
shamrock-smallrye-health-deployment
@@ -101,6 +108,12 @@
${project.version}
provided
+
+ org.jboss.shamrock
+ shamrock-smallrye-reactive-messaging-kafka-deployment
+ ${project.version}
+ provided
+
org.jboss.shamrock
diff --git a/build-parent/pom.xml b/build-parent/pom.xml
index 74f55fe35b14e6..ac5ba7d975198f 100644
--- a/build-parent/pom.xml
+++ b/build-parent/pom.xml
@@ -125,6 +125,10 @@
1.10.6
3.3.2.Final
0.0.1
+ 1.1.0
+ 1.1.0
+ 0.8.3.Final
+ 3.4.10
@@ -229,6 +233,16 @@
shamrock-jdbc-mariadb-runtime
${project.version}
+
+ org.jboss.shamrock
+ shamrock-kafka-client-deployment
+ ${project.version}
+
+
+ org.jboss.shamrock
+ shamrock-kafka-client-runtime
+ ${project.version}
+
org.jboss.shamrock
shamrock-smallrye-health-deployment
@@ -513,6 +527,16 @@
shamrock-smallrye-reactive-messaging-runtime
${project.version}
+
+ org.jboss.shamrock
+ shamrock-smallrye-reactive-messaging-kafka-deployment
+ ${project.version}
+
+
+ org.jboss.shamrock
+ shamrock-smallrye-reactive-messaging-kafka-runtime
+ ${project.version}
+
org.jboss.shamrock
shamrock-spring-di-deployment
@@ -618,6 +642,11 @@
jackson-databind
${jackson.version}
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${jackson.version}
+
com.sun.activation
jakarta.activation
@@ -707,6 +736,30 @@
+
+
+ io.debezium
+ debezium-core
+ ${debezium.version}
+ test
+
+
+ io.debezium
+ debezium-core
+ ${debezium.version}
+ test-jar
+ test
+
+
+ org.scala-lang
+ scala-reflect
+ 2.12.2
+
+
+ org.scala-lang
+ scala-library
+ 2.12.2
+
io.undertow
undertow-servlet
@@ -858,6 +911,22 @@
httpcore
${httpcore.version}
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka-clients.version}
+
+
+ org.apache.kafka
+ kafka_2.12
+ ${kafka2.version}
+ test
+
+
+ org.apache.zookeeper
+ zookeeper
+ ${zookeeper.version}
+
org.apache.logging.log4j
log4j-api
diff --git a/extensions/reactive-messaging/kafka-connector-deployment/pom.xml b/extensions/kafka-client/deployment/pom.xml
similarity index 75%
rename from extensions/reactive-messaging/kafka-connector-deployment/pom.xml
rename to extensions/kafka-client/deployment/pom.xml
index ecc22266a4f41e..88b73094433622 100644
--- a/extensions/reactive-messaging/kafka-connector-deployment/pom.xml
+++ b/extensions/kafka-client/deployment/pom.xml
@@ -6,11 +6,11 @@
org.jboss.shamrock
- shamrock-reactive-messaging
+ shamrock-kafka-client
1.0.0.Alpha1-SNAPSHOT
- shamrock-reactive-messaging-kafka-connector-deployment
+ shamrock-kafka-client-deployment
@@ -19,15 +19,8 @@
org.jboss.shamrock
- shamrock-arc-deployment
+ shamrock-kafka-client-runtime
-
- org.jboss.shamrock
- shamrock-reactive-messaging-kafka-connector-runtime
-
- ${project.version}
-
-
org.jboss.shamrock
shamrock-junit5-internal
diff --git a/extensions/kafka-client/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaProcessor.java
new file mode 100644
index 00000000000000..6ace8940bce5d3
--- /dev/null
+++ b/extensions/kafka-client/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaProcessor.java
@@ -0,0 +1,87 @@
+package org.jboss.shamrock.reactivemessaging;
+
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferSerializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.FloatDeserializer;
+import org.apache.kafka.common.serialization.FloatSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.ShortDeserializer;
+import org.apache.kafka.common.serialization.ShortSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.jboss.jandex.ClassInfo;
+import org.jboss.jandex.DotName;
+import org.jboss.shamrock.deployment.annotations.BuildProducer;
+import org.jboss.shamrock.deployment.annotations.BuildStep;
+import org.jboss.shamrock.deployment.builditem.CombinedIndexBuildItem;
+import org.jboss.shamrock.deployment.builditem.substrate.ReflectiveClassBuildItem;
+
+import java.util.Collection;
+
+public class KafkaProcessor {
+
+ static final Class[] BUILT_INS = {
+ //serializers
+ ShortSerializer.class,
+ DoubleSerializer.class,
+ LongSerializer.class,
+ BytesSerializer.class,
+ ByteArraySerializer.class,
+ IntegerSerializer.class,
+ ByteBufferSerializer.class,
+ StringSerializer.class,
+ FloatSerializer.class,
+
+ //deserializers
+ ShortDeserializer.class,
+ DoubleDeserializer.class,
+ LongDeserializer.class,
+ BytesDeserializer.class,
+ ByteArrayDeserializer.class,
+ IntegerDeserializer.class,
+ ByteBufferDeserializer.class,
+ StringDeserializer.class,
+ FloatDeserializer.class,
+ };
+
+ @BuildStep
+ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer reflectiveClass) {
+ Collection serializers = indexBuildItem.getIndex()
+ .getAllKnownSubclasses(DotName.createSimple(Serializer.class.getName()));
+ Collection deserializers = indexBuildItem.getIndex()
+ .getAllKnownSubclasses(DotName.createSimple(Deserializer.class.getName()));
+
+ for(Class i : BUILT_INS) {
+ reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, i.getName()));
+ }
+
+ for (ClassInfo s : serializers) {
+ reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString()));
+ }
+
+ for (ClassInfo s : deserializers) {
+ reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString()));
+ }
+
+ // Also
+ // Kafka has is heavily using reflection - at least these 2 classes are instantiated
+ // The first to produce
+ // The second to consume
+ reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, DefaultPartitioner.class.getName()));
+ reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, RangeAssignor.class.getName()));
+
+ }
+}
diff --git a/extensions/kafka-client/pom.xml b/extensions/kafka-client/pom.xml
new file mode 100644
index 00000000000000..d2f68a3e785638
--- /dev/null
+++ b/extensions/kafka-client/pom.xml
@@ -0,0 +1,21 @@
+
+
+
+ shamrock-build-parent
+ org.jboss.shamrock
+ 1.0.0.Alpha1-SNAPSHOT
+ ../../build-parent/pom.xml
+
+
+ 4.0.0
+ shamrock-kafka-client
+ Shamrock - Kafka - Client
+ pom
+
+
+ deployment
+ runtime
+
+
diff --git a/extensions/kafka-client/runtime/pom.xml b/extensions/kafka-client/runtime/pom.xml
new file mode 100644
index 00000000000000..81583f78f4a1e9
--- /dev/null
+++ b/extensions/kafka-client/runtime/pom.xml
@@ -0,0 +1,50 @@
+
+
+ 4.0.0
+
+
+ org.jboss.shamrock
+ shamrock-kafka-client
+ 1.0.0.Alpha1-SNAPSHOT
+
+
+ shamrock-kafka-client-runtime
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+
+ com.oracle.substratevm
+ svm
+
+
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ maven-compiler-plugin
+
+
+
+ org.jboss.shamrock
+ shamrock-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/extensions/reactive-messaging/kafka-connector-runtime/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java b/extensions/kafka-client/runtime/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java
similarity index 100%
rename from extensions/reactive-messaging/kafka-connector-runtime/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java
rename to extensions/kafka-client/runtime/src/main/java/io/smallrye/reactive/kafka/graal/SubstituteSnappy.java
diff --git a/extensions/pom.xml b/extensions/pom.xml
index b15386e7f97aff..9ac628f9b6660f 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -57,6 +57,7 @@
netty
reactive-streams-operators
smallrye-reactive-messaging
+ smallrye-reactive-messaging-kafka
narayana-jta
@@ -65,6 +66,7 @@
hibernate-orm
hibernate-validator
panache
+ kafka-client
spring-di
diff --git a/extensions/reactive-messaging/kafka-connector-deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java b/extensions/reactive-messaging/kafka-connector-deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java
deleted file mode 100644
index 8cffeff3ee2692..00000000000000
--- a/extensions/reactive-messaging/kafka-connector-deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.jboss.shamrock.reactivemessaging;
-
-import io.vertx.kafka.client.serialization.JsonObjectDeserializer;
-import io.vertx.kafka.client.serialization.JsonObjectSerializer;
-import org.apache.kafka.clients.consumer.RangeAssignor;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.jboss.jandex.ClassInfo;
-import org.jboss.jandex.DotName;
-import org.jboss.shamrock.deployment.annotations.BuildProducer;
-import org.jboss.shamrock.deployment.annotations.BuildStep;
-import org.jboss.shamrock.deployment.builditem.CombinedIndexBuildItem;
-import org.jboss.shamrock.deployment.builditem.substrate.ReflectiveClassBuildItem;
-
-import java.util.Collection;
-
-public class KafkaCodecProcessor {
-
- @BuildStep
- public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer reflectiveClass) {
- Collection serializers = indexBuildItem.getIndex()
- .getAllKnownSubclasses(DotName.createSimple(Serializer.class.getName()));
- Collection deserializers = indexBuildItem.getIndex()
- .getAllKnownSubclasses(DotName.createSimple(Deserializer.class.getName()));
-
- // TODO For some reason it does not seem to work:
-
- for (ClassInfo s : serializers) {
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString()));
- }
-
- for (ClassInfo s : deserializers) {
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString()));
- }
-
- // Explicit list them because of issue mentioned above ^
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, StringSerializer.class.getName()));
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, JsonObjectSerializer.class.getName()));
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, StringDeserializer.class.getName()));
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, JsonObjectDeserializer.class.getName()));
-
-
- // Also
- // Kafka has is heavily using reflection - at least these 2 classes are instantiated
- // The first to produce
- // The second to consume
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, DefaultPartitioner.class.getName()));
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, RangeAssignor.class.getName()));
-
- }
-}
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml b/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml
new file mode 100644
index 00000000000000..74275b4b017ccb
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml
@@ -0,0 +1,60 @@
+
+
+ 4.0.0
+
+
+ org.jboss.shamrock
+ shamrock-smallrye-reactive-messaging-kafka
+ 1.0.0.Alpha1-SNAPSHOT
+
+
+ shamrock-smallrye-reactive-messaging-kafka-deployment
+
+
+
+ org.jboss.shamrock
+ shamrock-core-deployment
+
+
+ org.jboss.shamrock
+ shamrock-arc-deployment
+
+
+ org.jboss.shamrock
+ shamrock-kafka-client-deployment
+
+
+ org.jboss.shamrock
+ shamrock-smallrye-reactive-messaging-kafka-runtime
+
+
+ org.jboss.shamrock
+ shamrock-vertx-deployment
+
+
+
+ org.jboss.shamrock
+ shamrock-junit5-internal
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ org.jboss.shamrock
+ shamrock-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java
new file mode 100644
index 00000000000000..8add59c3a33a63
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/org/jboss/shamrock/reactivemessaging/KafkaCodecProcessor.java
@@ -0,0 +1,35 @@
+package org.jboss.shamrock.reactivemessaging;
+
+import org.jboss.shamrock.deployment.annotations.BuildProducer;
+import org.jboss.shamrock.deployment.annotations.BuildStep;
+import org.jboss.shamrock.deployment.builditem.CombinedIndexBuildItem;
+import org.jboss.shamrock.deployment.builditem.substrate.ReflectiveClassBuildItem;
+
+import io.vertx.kafka.client.serialization.BufferDeserializer;
+import io.vertx.kafka.client.serialization.BufferSerializer;
+import io.vertx.kafka.client.serialization.JsonArrayDeserializer;
+import io.vertx.kafka.client.serialization.JsonArraySerializer;
+import io.vertx.kafka.client.serialization.JsonObjectDeserializer;
+import io.vertx.kafka.client.serialization.JsonObjectSerializer;
+
+public class KafkaCodecProcessor {
+
+ static final Class>[] BUILT_INS = {
+ JsonObjectSerializer.class,
+ BufferSerializer.class,
+ JsonArraySerializer.class,
+
+
+ JsonObjectDeserializer.class,
+ BufferDeserializer.class,
+ JsonArrayDeserializer.class,
+ };
+
+ @BuildStep
+ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer reflectiveClass) {
+
+ for (Class> s : BUILT_INS) {
+ reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.toString()));
+ }
+ }
+}
diff --git a/extensions/smallrye-reactive-messaging-kafka/pom.xml b/extensions/smallrye-reactive-messaging-kafka/pom.xml
new file mode 100644
index 00000000000000..711717e1c549cf
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging-kafka/pom.xml
@@ -0,0 +1,21 @@
+
+
+
+ shamrock-build-parent
+ org.jboss.shamrock
+ 1.0.0.Alpha1-SNAPSHOT
+ ../../build-parent/pom.xml
+
+
+ 4.0.0
+ shamrock-smallrye-reactive-messaging-kafka
+ Shamrock - SmallRye Reactive Messaging - Kafka
+ pom
+
+
+ deployment
+ runtime
+
+
diff --git a/extensions/reactive-messaging/kafka-connector-runtime/pom.xml b/extensions/smallrye-reactive-messaging-kafka/runtime/pom.xml
similarity index 75%
rename from extensions/reactive-messaging/kafka-connector-runtime/pom.xml
rename to extensions/smallrye-reactive-messaging-kafka/runtime/pom.xml
index 7181a77112b2d4..7d59ea8cc8f445 100644
--- a/extensions/reactive-messaging/kafka-connector-runtime/pom.xml
+++ b/extensions/smallrye-reactive-messaging-kafka/runtime/pom.xml
@@ -6,13 +6,17 @@
org.jboss.shamrock
- shamrock-reactive-messaging
+ shamrock-smallrye-reactive-messaging-kafka
1.0.0.Alpha1-SNAPSHOT
- shamrock-reactive-messaging-kafka-connector-runtime
+ shamrock-smallrye-reactive-messaging-kafka-runtime
+
+ org.jboss.shamrock
+ shamrock-kafka-client-runtime
+
io.smallrye.reactive
smallrye-reactive-messaging-kafka
@@ -44,7 +48,6 @@
org.apache.kafka
kafka-clients
- 1.1.0
@@ -74,10 +77,9 @@
-
org.jboss.shamrock
- shamrock-reactive-streams-operators-runtime
+ shamrock-smallrye-reactive-streams-operators-runtime
io.smallrye.reactive
@@ -96,6 +98,25 @@
+
+
+
+ maven-dependency-plugin
+
+
+ maven-compiler-plugin
+
+
+
+ org.jboss.shamrock
+ shamrock-extension-processor
+ ${project.version}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/extensions/smallrye-reactive-messaging/pom.xml b/extensions/smallrye-reactive-messaging/pom.xml
index 9aec40003078e9..8331e2ecb9c5a9 100644
--- a/extensions/smallrye-reactive-messaging/pom.xml
+++ b/extensions/smallrye-reactive-messaging/pom.xml
@@ -17,8 +17,5 @@
deployment
runtime
-
- kafka-connector-deployment
- kafka-connector-runtime
diff --git a/integration-tests/main/pom.xml b/integration-tests/main/pom.xml
index ba9be05cc8a582..7d9319d9aba743 100644
--- a/integration-tests/main/pom.xml
+++ b/integration-tests/main/pom.xml
@@ -154,6 +154,11 @@
shamrock-integration-test-common-jpa-entities
${project.version}
+
+ org.jboss.shamrock
+ shamrock-kafka-client-deployment
+ provided
+
@@ -171,6 +176,22 @@
rest-assured
test
+
+ io.debezium
+ debezium-core
+ test
+
+
+ io.debezium
+ debezium-core
+ test-jar
+ test
+
+
+ org.apache.kafka
+ kafka_2.12
+ test
+
diff --git a/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaConsumerManager.java b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaConsumerManager.java
new file mode 100644
index 00000000000000..9cedd92dd6b18d
--- /dev/null
+++ b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaConsumerManager.java
@@ -0,0 +1,43 @@
+package org.jboss.shamrock.example.kafka;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import javax.annotation.PostConstruct;
+import javax.enterprise.context.ApplicationScoped;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+@ApplicationScoped
+public class KafkaConsumerManager {
+
+ public static KafkaConsumer createConsumer() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList("test-consumer"));
+ return consumer;
+ }
+
+ private int count;
+ private Consumer consumer;
+
+ @PostConstruct
+ public void create() {
+ consumer = createConsumer();
+ }
+
+ public String receive() {
+ return consumer.poll(10000).iterator().next().value();
+ }
+
+}
diff --git a/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaEndpoint.java b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaEndpoint.java
new file mode 100644
index 00000000000000..7b7ea6e18c805e
--- /dev/null
+++ b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaEndpoint.java
@@ -0,0 +1,26 @@
+package org.jboss.shamrock.example.kafka;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+
+@Path("/kafka")
+public class KafkaEndpoint {
+
+ @Inject
+ KafkaProducerManager manager;
+
+ @Inject
+ KafkaConsumerManager consumer;
+
+ @POST
+ public void post(String message) {
+ manager.send(message);
+ }
+
+ @GET
+ public String get() {
+ return consumer.receive();
+ }
+}
diff --git a/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaProducerManager.java b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaProducerManager.java
new file mode 100644
index 00000000000000..82dddcb7834ce8
--- /dev/null
+++ b/integration-tests/main/src/main/java/org/jboss/shamrock/example/kafka/KafkaProducerManager.java
@@ -0,0 +1,40 @@
+package org.jboss.shamrock.example.kafka;
+
+import java.util.Properties;
+
+import javax.annotation.PostConstruct;
+import javax.enterprise.context.ApplicationScoped;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+@ApplicationScoped
+public class KafkaProducerManager {
+
+ public static Producer createProducer() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "test");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ return new KafkaProducer(props);
+ }
+
+ private int count;
+ private Producer producer;
+
+ @PostConstruct
+ public void create() {
+ producer = createProducer();
+ }
+
+ public void send(String message) {
+ producer.send(new ProducerRecord<>("test", count++, message));
+ producer.flush();
+ }
+
+}
diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerITCase.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerITCase.java
new file mode 100644
index 00000000000000..a52efc9956b0d8
--- /dev/null
+++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerITCase.java
@@ -0,0 +1,10 @@
+package org.jboss.shamrock.example.test;
+
+import org.jboss.shamrock.test.junit.SubstrateTest;
+
+@SubstrateTest
+public class KafkaConsumerITCase extends KafkaConsumerTest {
+
+
+
+}
diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerTest.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerTest.java
new file mode 100644
index 00000000000000..d6bd1b346ae706
--- /dev/null
+++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaConsumerTest.java
@@ -0,0 +1,40 @@
+package org.jboss.shamrock.example.test;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.hamcrest.Matchers;
+import org.jboss.shamrock.test.common.ShamrockTestResource;
+import org.jboss.shamrock.test.junit.ShamrockTest;
+import org.junit.jupiter.api.Test;
+
+import io.restassured.RestAssured;
+
+@ShamrockTestResource(KafkaTestResource.class)
+@ShamrockTest
+public class KafkaConsumerTest {
+
+
+ public static Producer createProducer() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-consumer");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ return new KafkaProducer(props);
+ }
+
+ @Test
+ public void test() throws Exception {
+ Producer consumer = createProducer();
+ consumer.send(new ProducerRecord<>("test-consumer", 1, "hi world"));
+ RestAssured.when().get("/kafka").then().body(Matchers.is("hi world"));
+ }
+
+
+}
diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerITCase.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerITCase.java
new file mode 100644
index 00000000000000..a68a64bc2dca7a
--- /dev/null
+++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerITCase.java
@@ -0,0 +1,10 @@
+package org.jboss.shamrock.example.test;
+
+import org.jboss.shamrock.test.junit.SubstrateTest;
+
+@SubstrateTest
+public class KafkaProducerITCase extends KafkaProducerTest {
+
+
+
+}
diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerTest.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerTest.java
new file mode 100644
index 00000000000000..8f5da6ff2b3a73
--- /dev/null
+++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaProducerTest.java
@@ -0,0 +1,45 @@
+package org.jboss.shamrock.example.test;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.jboss.shamrock.test.junit.ShamrockTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.restassured.RestAssured;
+
+@ShamrockTest
+public class KafkaProducerTest {
+
+
+ public static KafkaConsumer createConsumer() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList("test"));
+ return consumer;
+ }
+
+
+ @Test
+ public void test() throws Exception {
+ KafkaConsumer consumer = createConsumer();
+ RestAssured.with().body("hello").post("/kafka");
+ ConsumerRecord records = consumer.poll(10000).iterator().next();
+ Assertions.assertEquals(records.key(), (Integer)0);
+ Assertions.assertEquals(records.value(), "hello");
+ }
+
+
+}
diff --git a/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaTestResource.java b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaTestResource.java
new file mode 100644
index 00000000000000..5b32b81fdc3c32
--- /dev/null
+++ b/integration-tests/main/src/test/java/org/jboss/shamrock/example/test/KafkaTestResource.java
@@ -0,0 +1,37 @@
+package org.jboss.shamrock.example.test;
+
+import java.io.File;
+import java.util.Properties;
+
+import org.jboss.shamrock.test.common.ShamrockTestResourceLifecycleManager;
+
+import io.debezium.kafka.KafkaCluster;
+import io.debezium.util.Testing;
+
+public class KafkaTestResource implements ShamrockTestResourceLifecycleManager {
+
+ private KafkaCluster kafka;
+
+ @Override
+ public void start() {
+ try {
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connection.timeout.ms", "10000");
+ File directory = Testing.Files.createTestingDirectory("kafka-data", true);
+ kafka = new KafkaCluster().withPorts(2182, 19092)
+ .addBrokers(1)
+ .usingDirectory(directory)
+ .deleteDataUponShutdown(true)
+ .withKafkaConfiguration(props)
+ .deleteDataPriorToStartup(true)
+ .startup();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ kafka.shutdown();
+ }
+}