Skip to content

Commit

Permalink
update document for testing without broker
Browse files Browse the repository at this point in the history
  • Loading branch information
cuichenli committed Sep 25, 2023
1 parent 383c126 commit 2ba044d
Showing 1 changed file with 71 additions and 0 deletions.
71 changes: 71 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2017,11 +2017,14 @@ Create a Quarkus Test using the test resource created above:

[source, java]
----
import static org.awaitility.Awaitility.await;
@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {
@Inject
@Connector("smallrye-in-memory")
InMemoryConnector connector; // <1>
@Test
Expand Down Expand Up @@ -2054,6 +2057,74 @@ class BaristaTest {
The application will process this message and send a message to `beverages` channel.
<5> Use the `received` method on `beverages` channel to check the messages produced by the application.

If your Kafka consumer is batch based, you will need to send a batch of messages to the channel as by creating them manually.

For instance:

[source, java]
----
@ApplicationScoped
public class BeverageProcessor {
@Incoming("orders")
CompletionStage<Void> process(KafkaRecordBatch<String, Order> orders) {
System.out.println("Order received " + orders.getPayload().size());
return orders.ack();
}
}
----

[source, java]
----
import static org.awaitility.Awaitility.await;
@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {
@Inject
@Connector("smallrye-in-memory")
InMemoryConnector connector;
@Test
void testProcessOrder() {
InMemorySource<IncomingKafkaRecordBatch<String, Order>> ordersIn = connector.source("orders");
var committed = new AtomicBoolean(false); // <1>
var commitHandler = new KafkaCommitHandler() {
@Override
public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
committed.set(true); // <2>
return null;
}
};
var failureHandler = new KafkaFailureHandler() {
@Override
public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
return null;
}
};
Order order = new Order();
order.setProduct("coffee");
order.setName("Coffee lover");
order.setOrderId("1234");
var record = new ConsumerRecord<>("topic", 0, 0, "key", order);
var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record)));
var batch = new IncomingKafkaRecordBatch<>(
records, "kafka", 0, commitHandler, failureHandler, false, false); // <3>
ordersIn.send(batch);
await().until(committed::get); // <4>
}
}
----
<1> Create an `AtomicBoolean` to track if the batch has been committed.
<2> Update `committed` when the batch is committed.
<3> Create a `IncomingKafkaRecordBatch` with a single record.
<4> Wait until the batch is committed.

[IMPORTANT]
====
With in-memory channels we were able to test application code processing messages without starting a Kafka broker.
Expand Down

0 comments on commit 2ba044d

Please sign in to comment.