diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 60509c7c861b3..a9d105aa244ab 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -615,9 +615,10 @@ public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRe ---- package inbound; -import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import jakarta.enterprise.context.ApplicationScoped; import java.util.concurrent.CompletableFuture; @@ -628,7 +629,7 @@ public class KafkaRebalancedConsumer { @Incoming("rebalanced-example") @Acknowledgment(Acknowledgment.Strategy.NONE) - public CompletionStage consume(IncomingKafkaRecord message) { + public CompletionStage consume(Message> message) { // We don't need to ACK messages because in this example, // we set offset during consumer rebalance return CompletableFuture.completedFuture(null); @@ -715,14 +716,14 @@ public void consume(List prices) { } ---- -The incoming method can also receive `Message>`, `KafkaRecordBatch`, and `ConsumerRecords` types. +The incoming method can also receive `Message>`, `Message>`, and `ConsumerRecords` types. They give access to record details such as offset or timestamp: [source, java] ---- @Incoming("prices") -public CompletionStage consumeMessage(KafkaRecordBatch records) { - for (KafkaRecord record : records) { +public CompletionStage consumeMessage(Message> records) { + for (ConsumerRecord record : records.getPayload()) { String payload = record.getPayload(); String topic = record.getTopic(); // process messages @@ -1540,13 +1541,15 @@ The following example includes a batch of Kafka records inside a transaction. ---- import jakarta.enterprise.context.ApplicationScoped; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.OnOverflow; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.kafka.KafkaRecord; -import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch; import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions; @ApplicationScoped @@ -1557,10 +1560,10 @@ public class KafkaExactlyOnceProcessor { KafkaTransactions txProducer; @Incoming("prices-in") - public Uni emitInTransaction(KafkaRecordBatch batch) { // <1> + public Uni emitInTransaction(Message> batch) { // <1> return txProducer.withTransactionAndAck(batch, emitter -> { // <2> - for (KafkaRecord record : batch) { - emitter.send(KafkaRecord.of(record.getKey(), record.getPayload() + 1)); // <3> + for (ConsumerRecord record : batch.getPayload()) { + emitter.send(KafkaRecord.of(record.key(), record.value() + 1)); // <3> } return Uni.createFrom().voidItem(); }); @@ -1571,7 +1574,7 @@ public class KafkaExactlyOnceProcessor { <1> It is recommended to use exactly-once processing along with the batch consumption mode. While it is possible to use it with a single Kafka message, it'll have a significant performance impact. -<2> The consumed `KafkaRecordBatch` message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks. +<2> The consumed message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks. <3> The `send` method writes records to Kafka inside the transaction, without waiting for send receipt from the broker. Messages pending to be written to Kafka will be buffered, and flushed before committing the transaction. It is therefore recommended configuring the `@OnOverflow` `bufferSize` in order to fit enough messages, for example the `max.poll.records`, maximum amount of records returned in a batch. @@ -1908,7 +1911,7 @@ Similarly, if you declare [source,java] ---- @Incoming("my-kafka-records") -public void consume(KafkaRecord record) { +public void consume(Record record) { ... } ---- diff --git a/docs/src/main/asciidoc/pulsar.adoc b/docs/src/main/asciidoc/pulsar.adoc index 5ebbe6c71f993..eb2e86d2ad85a 100644 --- a/docs/src/main/asciidoc/pulsar.adoc +++ b/docs/src/main/asciidoc/pulsar.adoc @@ -416,22 +416,20 @@ You can enable batch mode using `batchReceive=true` property, or setting a `batc [source, java] ---- @Incoming("prices") -public CompletionStage consumeMessage(PulsarIncomingBatchMessage messages) { - for (PulsarMessage msg : messages) { - msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> { - String key = metadata.getKey(); - String topic = metadata.getTopicName(); - long timestamp = metadata.getEventTime(); - //... process messages - }); +public CompletionStage consumeMessage(Message> messages) { + for (org.apache.pulsar.client.api.Message msg : messages.getPayload()) { + String key = msg.getKey(); + String topic = msg.getTopicName(); + long timestamp = msg.getEventTime(); + //... process messages } // ack will commit the latest offsets (per partition) of the batch. return messages.ack(); } @Incoming("prices") -public void consumeRecords(Messages messages) { - for (Message msg : messages) { +public void consumeRecords(org.apache.pulsar.client.api.Messages messages) { + for (org.apache.pulsar.client.api.Message msg : messages) { //... process messages } } @@ -457,7 +455,7 @@ You can configure batch mode explicitly with `mp.messaging.incoming.$channel.bat == Sending messages to Pulsar -The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message. +The Pulsar Connector can write Reactive Messaging `Message`s as Pulsar Message. === Example @@ -739,32 +737,32 @@ package pulsar.outbound; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import org.apache.pulsar.client.api.Messages; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; -import io.smallrye.reactive.messaging.pulsar.PulsarMessage; import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions; -@ApplicationScoped -public class PulsarExactlyOnceProcessor { + @ApplicationScoped + public class PulsarExactlyOnceProcessor { - @Inject - @Channel("tx-out-example") - PulsarTransactions txProducer; - - @Incoming("in-channel") - public Uni emitInTransaction(PulsarIncomingBatchMessage batch) { - return txProducer.withTransactionAndAck(batch, emitter -> { - for (PulsarMessage record : batch) { - emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey())); - } - return Uni.createFrom().voidItem(); - }); - } + @Inject + @Channel("tx-out-example") + PulsarTransactions txProducer; -} + @Incoming("in-channel") + public Uni emitInTransaction(Message> batch) { + return txProducer.withTransactionAndAck(batch, emitter -> { + for (org.apache.pulsar.client.api.Message record : batch.getPayload()) { + emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey())); + } + return Uni.createFrom().voidItem(); + }); + } + + } ---- If the processing completes successfully, the message is acknowledged inside the transaction and the transaction is committed.