Skip to content

Commit

Permalink
Replace key&value tuple with ConsumerRecord in consumeWith (#620)
Browse files Browse the repository at this point in the history
Co-authored-by: svroonland <[email protected]>
Co-authored-by: Jules Ivanic <[email protected]>
  • Loading branch information
3 people authored Feb 26, 2023
1 parent 98a1883 commit 2d1ddd1
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
import zio._
import zio.kafka.admin._
Expand Down Expand Up @@ -161,7 +161,7 @@ object KafkaTestUtils {
) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live

def consumeWithStrings[RC](clientId: String, groupId: Option[String] = None, subscription: Subscription)(
r: (String, String) => URIO[Any, Unit]
r: ConsumerRecord[String, String] => URIO[Any, Unit]
): RIO[Kafka, Unit] =
consumerSettings(clientId, groupId, None).flatMap { settings =>
Consumer.consumeWith[Any, Any, String, String](
Expand Down
6 changes: 3 additions & 3 deletions zio-kafka-test/src/test/scala/zio/kafka/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ object ConsumerSpec extends ZIOKafkaSpec {
client <- randomClient
subscription = Subscription.Topics(Set(topic))
_ <- produceMany(topic, messages)
consumeResult <- consumeWithStrings(client, Some(group), subscription) { case (_, _) =>
consumeResult <- consumeWithStrings(client, Some(group), subscription) { _ =>
ZIO.die(new IllegalArgumentException("consumeWith failure"))
}.exit
} yield consumeResult.foldExit[TestResult](
Expand Down Expand Up @@ -517,9 +517,9 @@ object ConsumerSpec extends ZIOKafkaSpec {
messagesReceived: Ref[List[(String, String)]],
done: Promise[Nothing, Unit]
) =
consumeWithStrings(client, Some(group), subscription)({ (key, value) =>
consumeWithStrings(client, Some(group), subscription)({ record =>
for {
messagesSoFar <- messagesReceived.updateAndGet(_ :+ (key -> value))
messagesSoFar <- messagesReceived.updateAndGet(_ :+ (record.key() -> record.value()))
_ <- ZIO.when(messagesSoFar.size == nrMessages)(done.succeed(()))
} yield ()
}).fork
Expand Down
17 changes: 9 additions & 8 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetAndTimestamp }
import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp }
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition }
import zio._
import zio.kafka.serde.Deserializer
Expand Down Expand Up @@ -98,7 +98,7 @@ trait Consumer {
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(
f: (K, V) => URIO[R1, Unit]
f: ConsumerRecord[K, V] => URIO[R1, Unit]
): ZIO[R & R1, Throwable, Unit]

def subscribe(subscription: Subscription): Task[Unit]
Expand Down Expand Up @@ -269,7 +269,7 @@ object Consumer {
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(
f: (K, V) => URIO[R1, Unit]
f: ConsumerRecord[K, V] => URIO[R1, Unit]
): ZIO[R & R1, Throwable, Unit] =
for {
r <- ZIO.environment[R & R1]
Expand All @@ -280,7 +280,7 @@ object Consumer {
.flatMapPar(Int.MaxValue, bufferSize = settings.perPartitionChunkPrefetch) {
case (_, partitionStream) =>
partitionStream.mapChunksZIO(_.mapZIO { case CommittableRecord(record, offset) =>
f(record.key(), record.value()).as(offset)
f(record).as(offset)
})
}
}
Expand Down Expand Up @@ -456,9 +456,9 @@ object Consumer {
* val settings: ConsumerSettings = ???
* val subscription = Subscription.Topics(Set("my-kafka-topic"))
*
* val consumerIO = Consumer.consumeWith(settings, subscription, Serdes.string, Serdes.string) { case (key, value) =>
* val consumerIO = Consumer.consumeWith(settings, subscription, Serdes.string, Serdes.string) { record =>
* // Process the received record here
* putStrLn(s"Received record: \${key}: \${value}")
* putStrLn(s"Received record: \${record.key()}: \${record.value()}")
* }
* }}}
*
Expand All @@ -473,7 +473,8 @@ object Consumer {
* @param commitRetryPolicy
* Retry commits that failed due to a RetriableCommitFailedException according to this schedule
* @param f
* Function that returns the effect to execute for each message. It is passed the key and value
* Function that returns the effect to execute for each message. It is passed the
* [[org.apache.kafka.clients.consumer.ConsumerRecord]].
* @tparam R
* Environment for the consuming effect
* @tparam R1
Expand All @@ -491,7 +492,7 @@ object Consumer {
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V],
commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3)
)(f: (K, V) => URIO[R1, Unit]): RIO[R & R1, Unit] =
)(f: ConsumerRecord[K, V] => URIO[R1, Unit]): RIO[R & R1, Unit] =
ZIO.scoped[R & R1] {
Consumer
.make(settings)
Expand Down

0 comments on commit 2d1ddd1

Please sign in to comment.