Skip to content

Commit

Permalink
Merge pull request #20137 from ozangunalp/kafka_in_memory_doc_update
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Sep 14, 2021
2 parents 765f5f1 + 4afdb94 commit 35e3d3c
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,29 @@ To achieve this, you can _switch_ the channels managed by the Kafka connector to

IMPORTANT: This approach only works for JVM tests. It cannot be used for native tests (because they do not support injection).

First, add the following dependency to your application:
Let's say we want to test the following processor application:

[source, java]
----
@ApplicationScoped
public class BeverageProcessor {
@Incoming("orders")
@Outgoing("beverages")
Beverage process(Order order) {
System.out.println("Order received " + order.getProduct());
Beverage beverage = new Beverage();
beverage.setBeverage(order.getProduct());
beverage.setCustomer(order.getCustomer());
beverage.setOrderId(order.getOrderId());
beverage.setPreparationState("RECEIVED");
return beverage;
}
}
----

First, add the following test dependency to your application:

[source, xml]
----
Expand All @@ -1459,8 +1481,8 @@ public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLif
@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders"); // <1>
Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("queue"); // <2>
Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders"); // <1>
Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages"); // <2>
env.putAll(props1);
env.putAll(props2);
return env; // <3>
Expand All @@ -1472,8 +1494,8 @@ public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLif
}
}
----
<1> Switch the incoming channel "orders" (expecting messages from Kafka) to in-memory.
<2> Switch the outgoing channel "queue" (writing messages to Kafka) to in-memory.
<1> Switch the incoming channel `orders` (expecting messages from Kafka) to in-memory.
<2> Switch the outgoing channel `beverages` (writing messages to Kafka) to in-memory.
<3> Builds and returns a `Map` containing all the properties required to configure the application to use in-memory channels.
<4> When the test stops, clear the `InMemoryConnector` (discard all the received and sent messages)

Expand All @@ -1490,19 +1512,19 @@ class BaristaTest {
@Test
void testProcessOrder() {
InMemorySource<Order> orders = connector.source("orders"); // <2>
InMemorySink<Beverage> queue = connector.sink("queue"); // <3>
InMemorySource<Order> ordersIn = connector.source("orders"); // <2>
InMemorySink<Beverage> beveragesOut = connector.sink("beverages"); // <3>
Order order = new Order();
order.setProduct("coffee");
order.setName("Coffee lover");
order.setOrderId("1234");
orders.send(order); // <4>
ordersIn.send(order); // <4>
await().<List<? extends Message<Beverage>>>until(queue::received, t -> t.size() == 1); // <5>
await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); // <5>
Beverage queuedBeverage = queue.received().get(0).getPayload();
Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
Expand All @@ -1513,9 +1535,16 @@ class BaristaTest {
----
<1> Inject the in-memory connector in your test class.
<2> Retrieve the incoming channel (`orders`) - the channel must have been switched to in-memory in the test resource.
<3> Retrieve the outgoing channel (`queue`) - the channel must have been switched to in-memory in the test resource.
<4> Use the `send` method to send a message to the `orders` channel. So, the application will process this message.
<5> Use the `received` method to check the messages produced by the application.
<3> Retrieve the outgoing channel (`beverages`) - the channel must have been switched to in-memory in the test resource.
<4> Use the `send` method to send a message to the `orders` channel.
The application will process this message and send a message to `beverages` channel.
<5> Use the `received` method on `beverages` channel to check the messages produced by the application.

[IMPORTANT]
====
With in-memory channels we were able to test application code processing messages without starting a Kafka broker.
Note that different in-memory channels are independent, and switching channel connector to in-memory does not simulate message delivery between channels configured to the same Kafka topic.
====

=== Starting Kafka in a test resource

Expand Down

0 comments on commit 35e3d3c

Please sign in to comment.