Skip to content

Commit

Permalink
Fix outdated documentation for kafka batch listeners (#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo authored Aug 16, 2023
1 parent ccca481 commit e6c52ac
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -37,21 +38,18 @@ public Flux<Book> receiveFlux(Flux<Book> books) {

// tag::manual[]
@Topic("all-the-books")
public void receive(List<Book> books,
List<Long> offsets,
List<Integer> partitions,
List<String> topics,
Consumer kafkaConsumer) { // <1>
public void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) { // <1>

for (int i = 0; i < books.size(); i++) {
for (int i = 0; i < records.size(); i++) {
ConsumerRecord<String, Book> record = records.get(i); // <2>

// process the book
Book book = books.get(i); // <2>
Book book = record.value();

// commit offsets
String topic = topics.get(i);
int partition = partitions.get(i);
long offset = offsets.get(i); // <3>
String topic = record.topic();
int partition = record.partition();
long offset = record.offset(); // <3>

kafkaConsumer.commitSync(Collections.singletonMap( // <4>
new TopicPartition(topic, partition),
Expand Down
2 changes: 1 addition & 1 deletion src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ You can also take more control of committing offsets when doing batch processing
include::{testskafka}/consumer/batch/BookListener.java[tags=manual, indent=0]
----

<1> The method receives the batch of records as well as the offsets, partitions and topics
<1> The method receives the batch of books as a list of consumer records
<2> Each record is processed
<3> The offset, partition and topic is read for the record
<4> Offsets are committed
Expand Down

0 comments on commit e6c52ac

Please sign in to comment.