Skip to content

Commit

Permalink
Multi-language docs examples for client batch examples (#834)
Browse files Browse the repository at this point in the history
  • Loading branch information
wetted authored Aug 23, 2023
1 parent 4dfe2f5 commit 320f03c
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 26 deletions.
1 change: 0 additions & 1 deletion kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dependencies {
testImplementation mnSerde.micronaut.serde.jackson
testImplementation mnRxjava2.micronaut.rxjava2
testImplementation mn.micronaut.http.client
testImplementation mnRxjava2.micronaut.rxjava2

testRuntimeOnly mnMicrometer.micronaut.micrometer.registry.statsd
testRuntimeOnly mnTracing.micronaut.tracing.core
Expand Down
32 changes: 7 additions & 25 deletions src/main/docs/guide/kafkaClient/kafkaClientBatch.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,27 @@ By default if you define a method that takes a container type such as a jdk:java
For example the following two methods will both send serialized arrays:

.Sending Arrays and Lists
[source,java]
----
@Topic("books")
void sendList(List<Book> books);

@Topic("books")
void sendBooks(Book...books);
----
snippet::io.micronaut.kafka.docs.consumer.batch.BookClient[tags="lists,arrays", indent = 0]

Instead of a sending a serialized array you may wish to instead send batches of link:{kafkaapi}/org/apache/kafka/clients/producer/ProducerRecord.html[ProducerRecord] either synchronously or asynchronously.

To do this you can specify a value of `true` to the `batch` member of the ann:configuration.kafka.annotation.KafkaClient[] annotation:

.Sending `ProducerRecord` batches
[source,java]
----
@KafkaClient(batch=true)
@Topic("books")
void send(List<Book> books);
----

snippet::io.micronaut.kafka.docs.consumer.batch.BookClient[tags="clazz,lists"]

In the above case instead of sending a serialized array the client implementation will iterate over each item in the list and send a `ProducerRecord` for each. The previous example is blocking, however you can return a reactive type if desired:

.Sending `ProducerRecord` batches Reactively
[source,java]
----
@KafkaClient(batch=true)
@Topic("books")
Flowable<RecordMetadata> send(List<Book> books);
----

snippet::io.micronaut.kafka.docs.consumer.batch.BookClient[tags="clazz,reactive"]

You can also use an unbound reactive type such as rx:Flowable[] as the source of your batch data:

.Sending `ProducerRecord` batches from a Flowable
[source,java]
----
@KafkaClient(batch=true)
@Topic("books")
Flowable<RecordMetadata> send(Flowable<Book> books);
----

snippet::io.micronaut.kafka.docs.consumer.batch.BookClient[tags="clazz,flowable"]


1 change: 1 addition & 0 deletions test-suite-groovy/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ dependencies {
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(mnReactor.micronaut.reactor)
testImplementation(mnSerde.micronaut.serde.jackson)
testImplementation(mnRxjava2.micronaut.rxjava2)
testImplementation(projects.micronautKafka)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.micronaut.kafka.docs.consumer.batch

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.reactivex.Flowable
import org.apache.kafka.clients.producer.RecordMetadata

@Requires(property = 'spec.name', value = 'BookListenerTest')
// tag::clazz[]
@KafkaClient(batch = true)
interface BookClient {
// end::clazz[]

// tag::lists[]
@Topic('books')
void sendList(List<Book> books)
// end::lists[]

// tag::arrays[]
@Topic('books')
void sendBooks(Book...books)
// end::arrays[]

// tag::reactive[]
@Topic('books')
Flowable<RecordMetadata> send(List<Book> books)
// end::reactive[]

// tag::flowable[]
@Topic('books')
Flowable<RecordMetadata> send(Flowable<Book> books)
// end::flowable[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import groovy.util.logging.Slf4j
// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import reactor.core.publisher.Flux
// end::imports[]

@Requires(property = 'spec.name', value = 'BookListenerTest')
// tag::clazz[]
@KafkaListener(batch = true) // <1>
@Slf4j
Expand Down
1 change: 1 addition & 0 deletions test-suite-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ dependencies {
testImplementation(libs.awaitility)
testImplementation(mnReactor.micronaut.reactor)
testImplementation(mnSerde.micronaut.serde.jackson)
testImplementation(mnRxjava2.micronaut.rxjava2)
testImplementation(projects.micronautKafka)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.micronaut.kafka.docs.consumer.batch

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.reactivex.Flowable
import org.apache.kafka.clients.producer.RecordMetadata

@Requires(property = "spec.name", value = "BookListenerTest")
// tag::clazz[]
@KafkaClient(batch = true)
interface BookClient {
// end::clazz[]

// tag::lists[]
@Topic("books")
fun sendList(books: List<Book>)
// end::lists[]

// tag::arrays[]
@Topic("books")
fun sendBooks(vararg books: Book)
// end::arrays[]

// tag::reactive[]
@Topic("books")
fun send(books: List<Book>): Flowable<RecordMetadata>
// end::reactive[]

// tag::flowable[]
@Topic("books")
fun send(books: Flowable<Book>): Flowable<RecordMetadata>
// end::flowable[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.micronaut.kafka.docs.consumer.batch
// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.OffsetAndMetadata
Expand All @@ -12,6 +13,7 @@ import reactor.core.publisher.Flux
import java.util.*
// end::imports[]

@Requires(property = "spec.name", value = "BookListenerTest")
// tag::clazz[]
@KafkaListener(batch = true) // <1>
class BookListener {
Expand Down
1 change: 1 addition & 0 deletions test-suite/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ dependencies {
testImplementation(libs.awaitility)
testImplementation(mnReactor.micronaut.reactor)
testImplementation(mnSerde.micronaut.serde.jackson)
testImplementation(mnRxjava2.micronaut.rxjava2)
testImplementation(projects.micronautKafka)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.micronaut.kafka.docs.consumer.batch;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import io.reactivex.Flowable;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.List;

@Requires(property = "spec.name", value = "BookListenerTest")
// tag::clazz[]
@KafkaClient(batch = true)
public interface BookClient {
// end::clazz[]

// tag::lists[]
@Topic("books")
void sendList(List<Book> books);
// end::lists[]

// tag::arrays[]
@Topic("books")
void sendBooks(Book...books);
// end::arrays[]

// tag::reactive[]
@Topic("books")
Flowable<RecordMetadata> send(List<Book> books);
// end::reactive[]

// tag::flowable[]
@Topic("books")
Flowable<RecordMetadata> send(Flowable<Book> books);
// end::flowable[]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -16,6 +17,7 @@
import static org.slf4j.LoggerFactory.getLogger;
// end::imports[]

@Requires(property = "spec.name", value = "BatchBookListenerTest")
// tag::clazz[]
@KafkaListener(batch = true) // <1>
public class BookListener {
Expand Down

0 comments on commit 320f03c

Please sign in to comment.