Skip to content

Commit

Permalink
Doc changes for removing examples using Custom Message types.
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Mar 28, 2024
1 parent 8fbc2bc commit a634bc4
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 40 deletions.
25 changes: 14 additions & 11 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,10 @@ public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRe
----
package inbound;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
Expand All @@ -628,7 +629,7 @@ public class KafkaRebalancedConsumer {
@Incoming("rebalanced-example")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
public CompletionStage<Void> consume(Message<ConsumerRecord<Integer, String>> message) {
// We don't need to ACK messages because in this example,
// we set offset during consumer rebalance
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -715,14 +716,14 @@ public void consume(List<Double> prices) {
}
----

The incoming method can also receive `Message<List<Payload>>`, `KafkaRecordBatch<Key, Payload>`, and `ConsumerRecords<Key, Payload>` types.
The incoming method can also receive `Message<List<Payload>>`, `Message<ConsumerRecords<Key, Payload>>`, and `ConsumerRecords<Key, Payload>` types.
They give access to record details such as offset or timestamp:

[source, java]
----
@Incoming("prices")
public CompletionStage<Void> consumeMessage(KafkaRecordBatch<String, Double> records) {
for (KafkaRecord<String, Double> record : records) {
public CompletionStage<Void> consumeMessage(Message<ConsumerRecords<String, Double>> records) {
for (ConsumerRecord<String, Double> record : records.getPayload()) {
String payload = record.getPayload();
String topic = record.getTopic();
// process messages
Expand Down Expand Up @@ -1540,13 +1541,15 @@ The following example includes a batch of Kafka records inside a transaction.
----
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
@ApplicationScoped
Expand All @@ -1557,10 +1560,10 @@ public class KafkaExactlyOnceProcessor {
KafkaTransactions<Integer> txProducer;
@Incoming("prices-in")
public Uni<Void> emitInTransaction(KafkaRecordBatch<String, Integer> batch) { // <1>
public Uni<Void> emitInTransaction(Message<ConsumerRecords<String, Integer>> batch) { // <1>
return txProducer.withTransactionAndAck(batch, emitter -> { // <2>
for (KafkaRecord<String, Integer> record : batch) {
emitter.send(KafkaRecord.of(record.getKey(), record.getPayload() + 1)); // <3>
for (ConsumerRecord<String, Integer> record : batch.getPayload()) {
emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); // <3>
}
return Uni.createFrom().voidItem();
});
Expand All @@ -1571,7 +1574,7 @@ public class KafkaExactlyOnceProcessor {

<1> It is recommended to use exactly-once processing along with the batch consumption mode.
While it is possible to use it with a single Kafka message, it'll have a significant performance impact.
<2> The consumed `KafkaRecordBatch` message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks.
<2> The consumed message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks.
<3> The `send` method writes records to Kafka inside the transaction, without waiting for send receipt from the broker.
Messages pending to be written to Kafka will be buffered, and flushed before committing the transaction.
It is therefore recommended configuring the `@OnOverflow` `bufferSize` in order to fit enough messages, for example the `max.poll.records`, maximum amount of records returned in a batch.
Expand Down Expand Up @@ -1908,7 +1911,7 @@ Similarly, if you declare
[source,java]
----
@Incoming("my-kafka-records")
public void consume(KafkaRecord<Long, byte[]> record) {
public void consume(Record<Long, byte[]> record) {
...
}
----
Expand Down
56 changes: 27 additions & 29 deletions docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -416,22 +416,20 @@ You can enable batch mode using `batchReceive=true` property, or setting a `batc
[source, java]
----
@Incoming("prices")
public CompletionStage<Void> consumeMessage(PulsarIncomingBatchMessage<Double> messages) {
for (PulsarMessage<Double> msg : messages) {
msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> {
String key = metadata.getKey();
String topic = metadata.getTopicName();
long timestamp = metadata.getEventTime();
//... process messages
});
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
String key = msg.getKey();
String topic = msg.getTopicName();
long timestamp = msg.getEventTime();
//... process messages
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}
@Incoming("prices")
public void consumeRecords(Messages<Double> messages) {
for (Message<Double> msg : messages) {
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
//... process messages
}
}
Expand All @@ -457,7 +455,7 @@ You can configure batch mode explicitly with `mp.messaging.incoming.$channel.bat

== Sending messages to Pulsar

The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message.
The Pulsar Connector can write Reactive Messaging `Message`s as Pulsar Message.

=== Example

Expand Down Expand Up @@ -739,32 +737,32 @@ package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarExactlyOnceProcessor {
@ApplicationScoped
public class PulsarExactlyOnceProcessor {
@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;
@Incoming("in-channel")
public Uni<Void> emitInTransaction(PulsarIncomingBatchMessage<Integer> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (PulsarMessage<Integer> record : batch) {
emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}
@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;
}
@Incoming("in-channel")
public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}
}
----

If the processing completes successfully, the message is acknowledged inside the transaction and the transaction is committed.
Expand Down

0 comments on commit a634bc4

Please sign in to comment.