Skip to content

Commit

Permalink
Add Consumer.consumeWith: a simple API for consuming messages effectf…
Browse files Browse the repository at this point in the history
…ully (#22)

* Upgrade to zio 1.0.0-RC13

* Working consumeM function

* Doc improvements

* Process code review

* Add usage example

* Some tests for `consumeWith`

* Beginning of test that produces messages to several partitions

* Test for failure

* Fix merge errors
  • Loading branch information
svroonland authored and iravid committed Sep 28, 2019
1 parent ea0a88a commit ddbed58
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 5 deletions.
60 changes: 60 additions & 0 deletions src/main/scala/zio/kafka/client/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,64 @@ object Consumer {
)
runloop <- Runloop(deps)
} yield new Consumer(wrapper, settings, runloop)

/**
* Execute an effect for each record and commit the offset after processing
*
* This method is the easiest way of processing messages on a Kafka topic.
*
* Messages on a single partition are processed sequentially, while the processing of
* multiple partitions happens in parallel.
*
* Messages are processed with 'at least once' consistency: it is not guaranteed that every message
* that is processed by the effect has a corresponding offset commit before stream termination.
*
* Offsets are committed after execution of the effect. They are batched when a commit action is in progress
* to avoid backpressuring the stream.
*
* The effect should must absorb any failures. Failures should be handled by retries or ignoring the
* error, which will result in the Kafka message being skipped.
*
* Usage example:
*
* {{{
* val settings: ConsumerSettings = ???
* val subscription = Subscription.Topics(Set("my-kafka-topic"))
*
* val consumerIO = Consumer.consumeWith[Environment, String, String](settings, subscription) { case (key, value) =>
* // Process the received record here
* putStrLn(s"Received record: ${key}: ${value}")
* }
* }}}
*
* @param settings Settings for creating a [[Consumer]]
* @param subscription Topic subscription parameters
* @param f Function that returns the effect to execute for each message. It is passed the key and value
* @tparam K Type of keys (an implicit [[Serde]] should be in scope)
* @tparam V Type of values (an implicit [[Serde]] should be in scope)
* @return Effect that completes with a unit value only when interrupted. May fail when the [[Consumer]] fails.
*/
def consumeWith[R, K: Serde, V: Serde](
subscription: Subscription,
settings: ConsumerSettings
)(f: (K, V) => ZIO[R, Nothing, Unit]): ZIO[R with Clock with Blocking, Throwable, Unit] =
ZStream
.managed(Consumer.make[K, V](settings))
.flatMap { consumer =>
ZStream
.fromEffect(consumer.subscribe(subscription))
.flatMap { _ =>
consumer.partitionedStream
.flatMapPar(Int.MaxValue, outputBuffer = settings.perPartitionChunkPrefetch) {
case (_, partitionStream) =>
partitionStream.mapM {
case CommittableRecord(record, offset) =>
f(record.key(), record.value()).as(offset)
}.flattenChunks
}
}
}
.aggregate(ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _))
.mapM(_.commit)
.runDrain
}
104 changes: 101 additions & 3 deletions src/test/scala/zio/kafka/client/ConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package zio.kafka.client
import com.typesafe.scalalogging.LazyLogging
import net.manub.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ Serde, Serdes }
import org.scalatest.{ Matchers, WordSpecLike }
import org.scalatest.{ EitherValues, Matchers, WordSpecLike }
import zio._
import zio.blocking.Blocking
import zio.clock.Clock
import zio.duration._
import org.apache.kafka.common.TopicPartition
import zio.stream.ZSink

class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with DefaultRuntime {
class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with DefaultRuntime with EitherValues {
import KafkaTestUtils._

def pause(): ZIO[Clock, Nothing, Unit] = UIO(()).delay(2.seconds).forever
Expand Down Expand Up @@ -102,5 +102,103 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa
} yield (firstResults ++ secondResults).map(rec => rec.key() -> rec.value()) shouldEqual data
}
}

"consuming using consumeWith" should {
"consume all the messages on a topic" in unsafeRun {
val topic = "consumeWith"
val subscription = Subscription.Topics(Set(topic))
val nrMessages = 50
val nrPartitions = 5

for {
// Produce messages on several partitions
_ <- ZIO.effectTotal(EmbeddedKafka.createCustomTopic(topic, partitions = 5))
_ <- ZIO.traverse(1 to nrMessages) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}

// Consume messages
done <- Promise.make[Nothing, Unit]
messagesReceived <- Ref.make(List.empty[(String, String)])
fib <- Consumer
.consumeWith[Environment, String, String](
subscription,
settings("group3", "client3")
) { (key, value) =>
(for {
messagesSoFar <- messagesReceived.update(_ :+ (key -> value))
_ <- Task.when(messagesSoFar.size == nrMessages)(done.succeed(()))
} yield ()).orDie
}
.fork
_ <- done.await
_ <- fib.interrupt
_ <- fib.join.ignore
} yield succeed
}

"commit offsets for all consumed messages" in unsafeRun {
val topic = "consumeWith2"
val subscription = Subscription.Topics(Set(topic))
val nrMessages = 50
val messages = (1 to nrMessages).toList.map(i => (s"key$i", s"msg$i"))

for {
done <- Promise.make[Nothing, Unit]
messagesReceived <- Ref.make(List.empty[(String, String)])
_ <- produceMany(topic, messages)
fib <- Consumer
.consumeWith[Environment, String, String](
subscription,
settings("group3", "client3")
) { (key, value) =>
(for {
messagesSoFar <- messagesReceived.update(_ :+ (key -> value))
_ <- Task.when(messagesSoFar.size == nrMessages)(done.succeed(()))
} yield ()).orDie
}
.fork
_ <- done.await *> ZIO.sleep(3.seconds) // TODO the sleep is necessary for the outstanding commits to be flushed. Maybe we can fix that another way
_ <- fib.interrupt
_ <- fib.join.ignore
_ <- produceOne(topic, "key-new", "msg-new")
newMessage <- Consumer.make[String, String](settings("group3", "client3")).use { c =>
c.subscribe(subscription) *> c.plainStream
.take(1)
.flattenChunks
.map(r => (r.record.key(), r.record.value()))
.run(ZSink.collectAll[(String, String)])
.map(_.head)
}
consumedMessages <- messagesReceived.get
} yield consumedMessages shouldNot contain(newMessage)
}

"fail when the consuming effect produces a failure" in unsafeRun {
val topic = "consumeWith3"
val subscription = Subscription.Topics(Set(topic))
val nrMessages = 10
val messages = (1 to nrMessages).toList.map(i => (s"key$i", s"msg$i"))

for {
messagesReceived <- Ref.make(List.empty[(String, String)])
_ <- produceMany(topic, messages)
fib <- Consumer
.consumeWith[Environment, String, String](
subscription,
settings("group3", "client3")
) { (key, value) =>
(for {
messagesSoFar <- messagesReceived.update(_ :+ (key -> value))
_ <- Task.when(messagesSoFar.size == 3)(
ZIO.die(new IllegalArgumentException("consumeWith failure"))
)
} yield ()).orDie
}
.fork
testResult <- fib.map(_ => fail("Expected consumeWith to fail")).orElse(Fiber.succeed(succeed)).join
} yield testResult
}
}
}
}
6 changes: 4 additions & 2 deletions src/test/scala/zio/kafka/client/KafkaTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ object KafkaTestUtils {
EmbeddedKafka.publishToKafka(t, k, m)
}

def produceMany(t: String, kvs: List[(String, String)]): UIO[Unit] =
UIO.foreach(kvs)(i => produceOne(t, i._1, i._2)).unit
def produceMany(t: String, kvs: List[(String, String)]): UIO[Unit] = ZIO.effectTotal {
import net.manub.embeddedkafka.Codecs._
EmbeddedKafka.publishToKafka(t, kvs)
}

def produceMany(topic: String, partition: Int, kvs: List[(String, String)]) =
ZIO.effectTotal {
Expand Down

0 comments on commit ddbed58

Please sign in to comment.