forked from quarkusio/quarkus
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
4bed5b9
commit 39e0b4c
Showing
5 changed files
with
317 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
88 changes: 88 additions & 0 deletions
88
integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonNodeSerde.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package io.quarkus.it.kafka; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
|
||
import org.apache.kafka.common.serialization.Deserializer; | ||
import org.apache.kafka.common.serialization.Serde; | ||
import org.apache.kafka.common.serialization.Serializer; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
/** | ||
* A {@link Serde} that (de-)serializes JSON. | ||
*/ | ||
public class JsonNodeSerde implements Serde<JsonNode> { | ||
|
||
private final ObjectMapper mapper; | ||
|
||
public JsonNodeSerde() { | ||
mapper = new ObjectMapper(); | ||
} | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
|
||
@Override | ||
public Serializer<JsonNode> serializer() { | ||
return new JsonSerializer(); | ||
} | ||
|
||
@Override | ||
public Deserializer<JsonNode> deserializer() { | ||
return new JsonDeserializer(); | ||
} | ||
|
||
private final class JsonDeserializer implements Deserializer<JsonNode> { | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
} | ||
|
||
@Override | ||
public JsonNode deserialize(String topic, byte[] data) { | ||
if (data == null) { | ||
return null; | ||
} | ||
|
||
try { | ||
return mapper.readTree(data); | ||
} | ||
catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
} | ||
|
||
private final class JsonSerializer implements Serializer<JsonNode> { | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
} | ||
|
||
@Override | ||
public byte[] serialize(String topic, JsonNode data) { | ||
try { | ||
return mapper.writeValueAsBytes(data); | ||
} | ||
catch (JsonProcessingException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
} | ||
} |
104 changes: 104 additions & 0 deletions
104
integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package io.quarkus.it.kafka; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
import javax.enterprise.event.Observes; | ||
|
||
import org.apache.kafka.clients.CommonClientConfigs; | ||
import org.apache.kafka.clients.admin.AdminClient; | ||
import org.apache.kafka.clients.admin.ListTopicsResult; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.common.serialization.Serdes; | ||
import org.apache.kafka.streams.KafkaStreams; | ||
import org.apache.kafka.streams.StreamsBuilder; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.kstream.Consumed; | ||
import org.apache.kafka.streams.kstream.Joined; | ||
import org.apache.kafka.streams.kstream.KTable; | ||
import org.apache.kafka.streams.kstream.Produced; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
|
||
import io.quarkus.runtime.StartupEvent; | ||
|
||
@ApplicationScoped | ||
public class KafkaStreamsPipeline { | ||
|
||
private KafkaStreams streams; | ||
|
||
private ExecutorService executor; | ||
|
||
void onStart(@Observes StartupEvent ev) { | ||
Properties props = new Properties(); | ||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test-pipeline"); | ||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); | ||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024); | ||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); | ||
props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500); | ||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
StreamsBuilder builder = new StreamsBuilder(); | ||
|
||
JsonNodeSerde jsonNodeSerde = new JsonNodeSerde(); | ||
|
||
KTable<Integer, JsonNode> categories = builder.table("streams-test-categories", Consumed.with(Serdes.Integer(), jsonNodeSerde)); | ||
|
||
builder.stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde)) | ||
.selectKey((k, v) -> v.get("category").asInt()) | ||
.join( | ||
categories, | ||
(v1, v2) -> { | ||
((ObjectNode) v1).replace("category", v2); | ||
return v1; | ||
}, | ||
Joined.with(Serdes.Integer(), jsonNodeSerde, null) | ||
) | ||
.selectKey((k, v) -> v.get("id").asInt()) | ||
.to("streams-test-customers-processed", Produced.with(Serdes.Integer(), jsonNodeSerde)); | ||
|
||
streams = new KafkaStreams(builder.build(), props); | ||
|
||
executor = Executors.newSingleThreadExecutor(); | ||
executor.execute(() -> { | ||
waitForTopicsToBeCreated("localhost:19092"); | ||
streams.start(); | ||
}); | ||
} | ||
|
||
public void stop() { | ||
streams.close(); | ||
} | ||
private void waitForTopicsToBeCreated(String bootstrapServers) { | ||
Map<String, Object> config = new HashMap<>(); | ||
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
|
||
try (AdminClient adminClient = AdminClient.create(config)) { | ||
AtomicBoolean topicsCreated = new AtomicBoolean(false); | ||
|
||
while (topicsCreated.get() == false) { | ||
ListTopicsResult topics = adminClient.listTopics(); | ||
topics.names().whenComplete((t, e) -> { | ||
if (e != null) { | ||
throw new RuntimeException(e); | ||
} | ||
else if (t.contains("streams-test-categories") && t.contains("streams-test-customers")) { | ||
topicsCreated.set(true); | ||
} | ||
}); | ||
|
||
try { | ||
Thread.sleep(1000); | ||
} | ||
catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} | ||
} | ||
} |
8 changes: 8 additions & 0 deletions
8
integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package io.quarkus.it.main; | ||
|
||
import io.quarkus.test.junit.SubstrateTest; | ||
|
||
@SubstrateTest | ||
public class KafkaStreamsITCase extends KafkaStreamsTest { | ||
|
||
} |
109 changes: 109 additions & 0 deletions
109
integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package io.quarkus.it.main; | ||
|
||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Properties; | ||
|
||
import javax.inject.Inject; | ||
|
||
import org.apache.kafka.clients.consumer.Consumer; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.serialization.IntegerDeserializer; | ||
import org.apache.kafka.common.serialization.IntegerSerializer; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import io.quarkus.it.kafka.KafkaStreamsPipeline; | ||
import io.quarkus.test.common.QuarkusTestResource; | ||
import io.quarkus.test.junit.QuarkusTest; | ||
|
||
@QuarkusTestResource(KafkaTestResource.class) | ||
@QuarkusTest | ||
public class KafkaStreamsTest { | ||
|
||
@Inject | ||
private KafkaStreamsPipeline pipeline; | ||
|
||
private static Producer<Integer, String> createProducer() { | ||
Properties props = new Properties(); | ||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); | ||
props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer"); | ||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
|
||
return new KafkaProducer<Integer, String>(props); | ||
} | ||
|
||
private static KafkaConsumer<Integer, String> createConsumer() { | ||
Properties props = new Properties(); | ||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); | ||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-test-consumer"); | ||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); | ||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); | ||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
|
||
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); | ||
consumer.subscribe(Collections.singletonList("streams-test-customers-processed")); | ||
return consumer; | ||
} | ||
|
||
@AfterEach | ||
public void stopKafkaStreams() { | ||
// explicitly stopping the pipeline *before* the broker is shut down, as it | ||
// otherwise will time out | ||
pipeline.stop(); | ||
} | ||
|
||
@Test | ||
public void test() throws Exception { | ||
Producer<Integer, String> producer = createProducer(); | ||
|
||
producer.send(new ProducerRecord<>("streams-test-categories", 1, "{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }")); | ||
producer.send(new ProducerRecord<>("streams-test-categories", 2, "{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }")); | ||
|
||
producer.send(new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }")); | ||
producer.send(new ProducerRecord<>("streams-test-customers", 102, "{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }")); | ||
producer.send(new ProducerRecord<>("streams-test-customers", 103, "{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }")); | ||
|
||
Consumer<Integer, String> consumer = createConsumer(); | ||
List<ConsumerRecord<Integer, String>> records = poll(consumer, 3); | ||
|
||
ConsumerRecord<Integer, String> record = records.get(0); | ||
Assertions.assertEquals(101, record.key()); | ||
Assertions.assertEquals("{\"id\":101,\"name\":\"Bob\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", record.value()); | ||
|
||
record = records.get(1); | ||
Assertions.assertEquals(102, record.key()); | ||
Assertions.assertEquals("{\"id\":102,\"name\":\"Becky\",\"category\":{\"name\":\"B2C\",\"value\":\"business-to-customer\"}}", record.value()); | ||
|
||
record = records.get(2); | ||
Assertions.assertEquals(103, record.key()); | ||
Assertions.assertEquals("{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", record.value()); | ||
} | ||
|
||
private List<ConsumerRecord<Integer, String>> poll(Consumer<Integer, String> consumer, int expectedRecordCount) { | ||
int fetched = 0; | ||
List<ConsumerRecord<Integer, String>> result = new ArrayList<>(); | ||
while(fetched < expectedRecordCount) { | ||
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(20000)); | ||
records.forEach(result::add); | ||
fetched = result.size(); | ||
} | ||
|
||
return result; | ||
} | ||
|
||
} |