diff --git a/README.md b/README.md index 75cafeb65..64e245a82 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,8 @@ consumer.use { c => .plainStream(Serde.string, Serde.string) .flattenChunks .tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}")) - .mapM(_.offset.commit) + .aggregate(Consumer.offsetBatches) + .mapM(_.commit) .runDrain } ``` @@ -96,7 +97,8 @@ consumer.use { c => .tap(tpAndStr => putStrLn(s"topic: ${tpAndStr._1.topic}, partition: ${tpAndStr._1.partition}")) .flatMap(_._2.flattenChunks) .tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}")) - .mapM(_.offset.commit) + .aggregate(Consumer.offsetBatches) + .mapM(_.commit) .runDrain } ``` diff --git a/src/main/scala/zio/kafka/client/Consumer.scala b/src/main/scala/zio/kafka/client/Consumer.scala index 8cf698f0d..f4591260e 100644 --- a/src/main/scala/zio/kafka/client/Consumer.scala +++ b/src/main/scala/zio/kafka/client/Consumer.scala @@ -101,6 +101,9 @@ class Consumer private ( } object Consumer { + val offsetBatches: ZSink[Any, Nothing, Nothing, Offset, OffsetBatch] = + ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _) + def make( settings: ConsumerSettings, diagnostics: Diagnostics = Diagnostics.NoOp @@ -180,7 +183,7 @@ object Consumer { } } } - .aggregate(ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _)) + .aggregate(offsetBatches) .mapM(_.commit) .runDrain }