From 2d1ddd1832908e603dd19aa7db0512ca6cb16c8e Mon Sep 17 00:00:00 2001 From: Wojciech Date: Sun, 26 Feb 2023 19:39:50 +0100 Subject: [PATCH] Replace key&value tuple with ConsumerRecord in consumeWith (#620) Co-authored-by: svroonland Co-authored-by: Jules Ivanic --- .../main/scala/zio/kafka/KafkaTestUtils.scala | 4 ++-- .../src/test/scala/zio/kafka/ConsumerSpec.scala | 6 +++--- .../scala/zio/kafka/consumer/Consumer.scala | 17 +++++++++-------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala b/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala index c6f3bc654..1538f6f8a 100644 --- a/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala +++ b/zio-kafka-test-utils/src/main/scala/zio/kafka/KafkaTestUtils.scala @@ -1,6 +1,6 @@ package zio.kafka -import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata } import zio._ import zio.kafka.admin._ @@ -161,7 +161,7 @@ object KafkaTestUtils { ) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live def consumeWithStrings[RC](clientId: String, groupId: Option[String] = None, subscription: Subscription)( - r: (String, String) => URIO[Any, Unit] + r: ConsumerRecord[String, String] => URIO[Any, Unit] ): RIO[Kafka, Unit] = consumerSettings(clientId, groupId, None).flatMap { settings => Consumer.consumeWith[Any, Any, String, String]( diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ConsumerSpec.scala index 0f9136562..44e73014d 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ConsumerSpec.scala @@ -284,7 +284,7 @@ object ConsumerSpec extends ZIOKafkaSpec { client <- randomClient subscription = Subscription.Topics(Set(topic)) _ <- produceMany(topic, messages) - consumeResult <- consumeWithStrings(client, Some(group), subscription) { case (_, _) => + consumeResult <- consumeWithStrings(client, Some(group), subscription) { _ => ZIO.die(new IllegalArgumentException("consumeWith failure")) }.exit } yield consumeResult.foldExit[TestResult]( @@ -517,9 +517,9 @@ object ConsumerSpec extends ZIOKafkaSpec { messagesReceived: Ref[List[(String, String)]], done: Promise[Nothing, Unit] ) = - consumeWithStrings(client, Some(group), subscription)({ (key, value) => + consumeWithStrings(client, Some(group), subscription)({ record => for { - messagesSoFar <- messagesReceived.updateAndGet(_ :+ (key -> value)) + messagesSoFar <- messagesReceived.updateAndGet(_ :+ (record.key() -> record.value())) _ <- ZIO.when(messagesSoFar.size == nrMessages)(done.succeed(())) } yield () }).fork diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index da58fb6b3..e25085078 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -1,6 +1,6 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetAndTimestamp } +import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp } import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition } import zio._ import zio.kafka.serde.Deserializer @@ -98,7 +98,7 @@ trait Consumer { valueDeserializer: Deserializer[R, V], commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) )( - f: (K, V) => URIO[R1, Unit] + f: ConsumerRecord[K, V] => URIO[R1, Unit] ): ZIO[R & R1, Throwable, Unit] def subscribe(subscription: Subscription): Task[Unit] @@ -269,7 +269,7 @@ object Consumer { valueDeserializer: Deserializer[R, V], commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) )( - f: (K, V) => URIO[R1, Unit] + f: ConsumerRecord[K, V] => URIO[R1, Unit] ): ZIO[R & R1, Throwable, Unit] = for { r <- ZIO.environment[R & R1] @@ -280,7 +280,7 @@ object Consumer { .flatMapPar(Int.MaxValue, bufferSize = settings.perPartitionChunkPrefetch) { case (_, partitionStream) => partitionStream.mapChunksZIO(_.mapZIO { case CommittableRecord(record, offset) => - f(record.key(), record.value()).as(offset) + f(record).as(offset) }) } } @@ -456,9 +456,9 @@ object Consumer { * val settings: ConsumerSettings = ??? * val subscription = Subscription.Topics(Set("my-kafka-topic")) * - * val consumerIO = Consumer.consumeWith(settings, subscription, Serdes.string, Serdes.string) { case (key, value) => + * val consumerIO = Consumer.consumeWith(settings, subscription, Serdes.string, Serdes.string) { record => * // Process the received record here - * putStrLn(s"Received record: \${key}: \${value}") + * putStrLn(s"Received record: \${record.key()}: \${record.value()}") * } * }}} * @@ -473,7 +473,8 @@ object Consumer { * @param commitRetryPolicy * Retry commits that failed due to a RetriableCommitFailedException according to this schedule * @param f - * Function that returns the effect to execute for each message. It is passed the key and value + * Function that returns the effect to execute for each message. It is passed the + * [[org.apache.kafka.clients.consumer.ConsumerRecord]]. * @tparam R * Environment for the consuming effect * @tparam R1 @@ -491,7 +492,7 @@ object Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], commitRetryPolicy: Schedule[Any, Any, Any] = Schedule.exponential(1.second) && Schedule.recurs(3) - )(f: (K, V) => URIO[R1, Unit]): RIO[R & R1, Unit] = + )(f: ConsumerRecord[K, V] => URIO[R1, Unit]): RIO[R & R1, Unit] = ZIO.scoped[R & R1] { Consumer .make(settings)