forked from quarkusio/quarkus
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
quarkusio#2663 Adding integration test for Kafka Streams
- Loading branch information
1 parent
2a4bf29
commit a0d8f91
Showing
5 changed files
with
328 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package io.quarkus.it.kafka; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.util.Map; | ||
|
||
import javax.json.Json; | ||
import javax.json.JsonObject; | ||
import javax.json.JsonReader; | ||
|
||
import org.apache.kafka.common.serialization.Deserializer; | ||
import org.apache.kafka.common.serialization.Serde; | ||
import org.apache.kafka.common.serialization.Serializer; | ||
|
||
/** | ||
* A {@link Serde} that (de-)serializes JSON. | ||
*/ | ||
public class JsonObjectSerde implements Serde<JsonObject> { | ||
|
||
public JsonObjectSerde() { | ||
} | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
|
||
@Override | ||
public Serializer<JsonObject> serializer() { | ||
return new JsonSerializer(); | ||
} | ||
|
||
@Override | ||
public Deserializer<JsonObject> deserializer() { | ||
return new JsonDeserializer(); | ||
} | ||
|
||
private final class JsonDeserializer implements Deserializer<JsonObject> { | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
} | ||
|
||
@Override | ||
public JsonObject deserialize(String topic, byte[] data) { | ||
if (data == null) { | ||
return null; | ||
} | ||
|
||
try (JsonReader reader = Json.createReader(new ByteArrayInputStream(data))) { | ||
return reader.readObject(); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
} | ||
|
||
private final class JsonSerializer implements Serializer<JsonObject> { | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
} | ||
|
||
@Override | ||
public byte[] serialize(String topic, JsonObject data) { | ||
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { | ||
Json.createWriter(output).writeObject(data); | ||
return output.toByteArray(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
} | ||
} |
112 changes: 112 additions & 0 deletions
112
integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
package io.quarkus.it.kafka; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
import javax.enterprise.event.Observes; | ||
import javax.json.Json; | ||
import javax.json.JsonObject; | ||
import javax.json.JsonObjectBuilder; | ||
import javax.ws.rs.POST; | ||
import javax.ws.rs.Path; | ||
|
||
import org.apache.kafka.clients.CommonClientConfigs; | ||
import org.apache.kafka.clients.admin.AdminClient; | ||
import org.apache.kafka.clients.admin.ListTopicsResult; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.common.serialization.Serdes; | ||
import org.apache.kafka.streams.KafkaStreams; | ||
import org.apache.kafka.streams.StreamsBuilder; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.kstream.Consumed; | ||
import org.apache.kafka.streams.kstream.Joined; | ||
import org.apache.kafka.streams.kstream.KTable; | ||
import org.apache.kafka.streams.kstream.Produced; | ||
|
||
import io.quarkus.runtime.StartupEvent; | ||
|
||
@ApplicationScoped | ||
@Path("/kafkastreams") | ||
public class KafkaStreamsPipeline { | ||
|
||
private KafkaStreams streams; | ||
|
||
private ExecutorService executor; | ||
|
||
void onStart(@Observes StartupEvent ev) { | ||
Properties props = new Properties(); | ||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test-pipeline"); | ||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); | ||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024); | ||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); | ||
props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500); | ||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
StreamsBuilder builder = new StreamsBuilder(); | ||
|
||
JsonObjectSerde jsonNodeSerde = new JsonObjectSerde(); | ||
KTable<Integer, JsonObject> categories = builder.table( | ||
"streams-test-categories", | ||
Consumed.with(Serdes.Integer(), jsonNodeSerde) | ||
); | ||
|
||
builder.stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde)) | ||
.selectKey((k, v) -> v.getJsonNumber("category").intValue()) | ||
.join( | ||
categories, | ||
(v1, v2) -> { | ||
JsonObjectBuilder target = Json.createObjectBuilder(); | ||
v1.forEach(target::add); | ||
target.add("category", v2); | ||
return target.build(); | ||
}, | ||
Joined.with(Serdes.Integer(), jsonNodeSerde, null) | ||
) | ||
.selectKey((k, v) -> v.getJsonNumber("id").intValue()) | ||
.to("streams-test-customers-processed", Produced.with(Serdes.Integer(), jsonNodeSerde)); | ||
|
||
streams = new KafkaStreams(builder.build(), props); | ||
|
||
executor = Executors.newSingleThreadExecutor(); | ||
executor.execute(() -> { | ||
waitForTopicsToBeCreated("localhost:19092"); | ||
streams.start(); | ||
}); | ||
} | ||
|
||
@POST | ||
@Path("/stop") | ||
public void stop() { | ||
streams.close(); | ||
} | ||
|
||
private void waitForTopicsToBeCreated(String bootstrapServers) { | ||
Map<String, Object> config = new HashMap<>(); | ||
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
|
||
try (AdminClient adminClient = AdminClient.create(config)) { | ||
AtomicBoolean topicsCreated = new AtomicBoolean(false); | ||
|
||
while (topicsCreated.get() == false) { | ||
ListTopicsResult topics = adminClient.listTopics(); | ||
topics.names().whenComplete((t, e) -> { | ||
if (e != null) { | ||
throw new RuntimeException(e); | ||
} else if (t.contains("streams-test-categories") && t.contains("streams-test-customers")) { | ||
topicsCreated.set(true); | ||
} | ||
}); | ||
|
||
try { | ||
Thread.sleep(50); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} | ||
} | ||
} |
8 changes: 8 additions & 0 deletions
8
integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package io.quarkus.it.main; | ||
|
||
import io.quarkus.test.junit.SubstrateTest; | ||
|
||
@SubstrateTest | ||
public class KafkaStreamsITCase extends KafkaStreamsTest { | ||
|
||
} |
115 changes: 115 additions & 0 deletions
115
integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
package io.quarkus.it.main; | ||
|
||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Properties; | ||
|
||
import org.apache.kafka.clients.consumer.Consumer; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.serialization.IntegerDeserializer; | ||
import org.apache.kafka.common.serialization.IntegerSerializer; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import io.quarkus.test.common.QuarkusTestResource; | ||
import io.quarkus.test.junit.QuarkusTest; | ||
import io.restassured.RestAssured; | ||
|
||
@QuarkusTestResource(KafkaTestResource.class) | ||
@QuarkusTest | ||
public class KafkaStreamsTest { | ||
|
||
private static Producer<Integer, String> createProducer() { | ||
Properties props = new Properties(); | ||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); | ||
props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer"); | ||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
|
||
return new KafkaProducer<Integer, String>(props); | ||
} | ||
|
||
private static KafkaConsumer<Integer, String> createConsumer() { | ||
Properties props = new Properties(); | ||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); | ||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-test-consumer"); | ||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); | ||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); | ||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
|
||
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); | ||
consumer.subscribe(Collections.singletonList("streams-test-customers-processed")); | ||
return consumer; | ||
} | ||
|
||
@AfterEach | ||
public void stopKafkaStreams() { | ||
// explicitly stopping the pipeline *before* the broker is shut down, as it | ||
// otherwise will time out | ||
RestAssured.post("/kafkastreams/stop"); | ||
} | ||
|
||
@Test | ||
public void testKafkaStreams() throws Exception { | ||
Producer<Integer, String> producer = createProducer(); | ||
|
||
producer.send(new ProducerRecord<>("streams-test-categories", 1, | ||
"{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }")); | ||
producer.send(new ProducerRecord<>("streams-test-categories", 2, | ||
"{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }")); | ||
|
||
producer.send( | ||
new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }")); | ||
producer.send(new ProducerRecord<>("streams-test-customers", 102, | ||
"{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }")); | ||
producer.send(new ProducerRecord<>("streams-test-customers", 103, | ||
"{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }")); | ||
|
||
Consumer<Integer, String> consumer = createConsumer(); | ||
List<ConsumerRecord<Integer, String>> records = poll(consumer, 3); | ||
|
||
ConsumerRecord<Integer, String> record = records.get(0); | ||
Assertions.assertEquals(101, record.key()); | ||
Assertions.assertEquals( | ||
"{\"id\":101,\"name\":\"Bob\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", | ||
record.value()); | ||
|
||
record = records.get(1); | ||
Assertions.assertEquals(102, record.key()); | ||
Assertions.assertEquals( | ||
"{\"id\":102,\"name\":\"Becky\",\"category\":{\"name\":\"B2C\",\"value\":\"business-to-customer\"}}", | ||
record.value()); | ||
|
||
record = records.get(2); | ||
Assertions.assertEquals(103, record.key()); | ||
Assertions.assertEquals( | ||
"{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", | ||
record.value()); | ||
} | ||
|
||
private List<ConsumerRecord<Integer, String>> poll(Consumer<Integer, String> consumer, int expectedRecordCount) { | ||
int fetched = 0; | ||
List<ConsumerRecord<Integer, String>> result = new ArrayList<>(); | ||
while (fetched < expectedRecordCount) { | ||
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(20000)); | ||
records.forEach(result::add); | ||
fetched = result.size(); | ||
} | ||
|
||
return result; | ||
} | ||
|
||
} |