From 2ba044da83f3dff1e01888e3050a4cb2af9db83a Mon Sep 17 00:00:00 2001 From: Will Li Date: Thu, 27 Jul 2023 14:30:51 +0800 Subject: [PATCH] update document for testing without broker --- docs/src/main/asciidoc/kafka.adoc | 71 +++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 1491f500eeb14..25e142198b021 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -2017,11 +2017,14 @@ Create a Quarkus Test using the test resource created above: [source, java] ---- +import static org.awaitility.Awaitility.await; + @QuarkusTest @QuarkusTestResource(KafkaTestResourceLifecycleManager.class) class BaristaTest { @Inject + @Connector("smallrye-in-memory") InMemoryConnector connector; // <1> @Test @@ -2054,6 +2057,74 @@ class BaristaTest { The application will process this message and send a message to `beverages` channel. <5> Use the `received` method on `beverages` channel to check the messages produced by the application. +If your Kafka consumer is batch based, you will need to send a batch of messages to the channel as by creating them manually. + +For instance: + +[source, java] +---- +@ApplicationScoped +public class BeverageProcessor { + + @Incoming("orders") + CompletionStage process(KafkaRecordBatch orders) { + System.out.println("Order received " + orders.getPayload().size()); + return orders.ack(); + } +} +---- + +[source, java] +---- +import static org.awaitility.Awaitility.await; + +@QuarkusTest +@QuarkusTestResource(KafkaTestResourceLifecycleManager.class) +class BaristaTest { + + @Inject + @Connector("smallrye-in-memory") + + InMemoryConnector connector; + + @Test + void testProcessOrder() { + InMemorySource> ordersIn = connector.source("orders"); + var committed = new AtomicBoolean(false); // <1> + var commitHandler = new KafkaCommitHandler() { + @Override + public Uni handle(IncomingKafkaRecord record) { + committed.set(true); // <2> + return null; + } + }; + var failureHandler = new KafkaFailureHandler() { + @Override + public Uni handle(IncomingKafkaRecord record, Throwable reason, Metadata metadata) { + return null; + } + }; + + Order order = new Order(); + order.setProduct("coffee"); + order.setName("Coffee lover"); + order.setOrderId("1234"); + var record = new ConsumerRecord<>("topic", 0, 0, "key", order); + var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record))); + var batch = new IncomingKafkaRecordBatch<>( + records, "kafka", 0, commitHandler, failureHandler, false, false); // <3> + + ordersIn.send(batch); + + await().until(committed::get); // <4> + } +} +---- +<1> Create an `AtomicBoolean` to track if the batch has been committed. +<2> Update `committed` when the batch is committed. +<3> Create a `IncomingKafkaRecordBatch` with a single record. +<4> Wait until the batch is committed. + [IMPORTANT] ==== With in-memory channels we were able to test application code processing messages without starting a Kafka broker.