Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High-level API for consuming messages with an effect #22

Merged
merged 11 commits into from
Sep 28, 2019
60 changes: 60 additions & 0 deletions src/main/scala/zio/kafka/client/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,64 @@ object Consumer {
)
runloop <- Runloop(deps)
} yield new Consumer(wrapper, settings, runloop)

/**
* Execute an effect for each record and commit the offset after processing
*
* This method is the easiest way of processing messages on a Kafka topic.
*
* Messages on a single partition are processed sequentially, while the processing of
* multiple partitions happens in parallel.
*
* Messages are processed with 'at least once' consistency: it is not guaranteed that every message
* that is processed by the effect has a corresponding offset commit before stream termination.
*
* Offsets are committed after execution of the effect. They are batched when a commit action is in progress
* to avoid backpressuring the stream.
*
* The effect should must absorb any failures. Failures should be handled by retries or ignoring the
* error, which will result in the Kafka message being skipped.
*
* Usage example:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example would be nice, yeah!

*
* {{{
* val settings: ConsumerSettings = ???
* val subscription = Subscription.Topics(Set("my-kafka-topic"))
*
* val consumerIO = Consumer.consumeWith[Environment, String, String](settings, subscription) { case (key, value) =>
* // Process the received record here
* putStrLn(s"Received record: ${key}: ${value}")
* }
* }}}
*
* @param settings Settings for creating a [[Consumer]]
* @param subscription Topic subscription parameters
* @param f Function that returns the effect to execute for each message. It is passed the key and value
* @tparam K Type of keys (an implicit [[Serde]] should be in scope)
* @tparam V Type of values (an implicit [[Serde]] should be in scope)
* @return Effect that completes with a unit value only when interrupted. May fail when the [[Consumer]] fails.
*/
def consumeWith[R, K: Serde, V: Serde](
subscription: Subscription,
settings: ConsumerSettings
)(f: (K, V) => ZIO[R, Nothing, Unit]): ZIO[R with Clock with Blocking, Throwable, Unit] =
ZStream
.managed(Consumer.make[K, V](settings))
.flatMap { consumer =>
ZStream
.fromEffect(consumer.subscribe(subscription))
.flatMap { _ =>
consumer.partitionedStream
.flatMapPar(Int.MaxValue, outputBuffer = settings.perPartitionChunkPrefetch) {
case (_, partitionStream) =>
partitionStream.mapM {
case CommittableRecord(record, offset) =>
f(record.key(), record.value()).as(offset)
}.flattenChunks
}
}
}
.aggregate(ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _))
.mapM(_.commit)
.runDrain
}
104 changes: 101 additions & 3 deletions src/test/scala/zio/kafka/client/ConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package zio.kafka.client
import com.typesafe.scalalogging.LazyLogging
import net.manub.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ Serde, Serdes }
import org.scalatest.{ Matchers, WordSpecLike }
import org.scalatest.{ EitherValues, Matchers, WordSpecLike }
import zio._
import zio.blocking.Blocking
import zio.clock.Clock
import zio.duration._
import org.apache.kafka.common.TopicPartition
import zio.stream.ZSink

class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with DefaultRuntime {
class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with DefaultRuntime with EitherValues {
import KafkaTestUtils._

def pause(): ZIO[Clock, Nothing, Unit] = UIO(()).delay(2.seconds).forever
Expand Down Expand Up @@ -102,5 +102,103 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa
} yield (firstResults ++ secondResults).map(rec => rec.key() -> rec.value()) shouldEqual data
}
}

"consuming using consumeWith" should {
"consume all the messages on a topic" in unsafeRun {
val topic = "consumeWith"
val subscription = Subscription.Topics(Set(topic))
val nrMessages = 50
val nrPartitions = 5

for {
// Produce messages on several partitions
_ <- ZIO.effectTotal(EmbeddedKafka.createCustomTopic(topic, partitions = 5))
_ <- ZIO.traverse(1 to nrMessages) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}

// Consume messages
done <- Promise.make[Nothing, Unit]
messagesReceived <- Ref.make(List.empty[(String, String)])
fib <- Consumer
.consumeWith[Environment, String, String](
subscription,
settings("group3", "client3")
) { (key, value) =>
(for {
messagesSoFar <- messagesReceived.update(_ :+ (key -> value))
_ <- Task.when(messagesSoFar.size == nrMessages)(done.succeed(()))
} yield ()).orDie
}
.fork
_ <- done.await
_ <- fib.interrupt
_ <- fib.join.ignore
} yield succeed
}

"commit offsets for all consumed messages" in unsafeRun {
val topic = "consumeWith2"
val subscription = Subscription.Topics(Set(topic))
val nrMessages = 50
val messages = (1 to nrMessages).toList.map(i => (s"key$i", s"msg$i"))

for {
done <- Promise.make[Nothing, Unit]
messagesReceived <- Ref.make(List.empty[(String, String)])
_ <- produceMany(topic, messages)
fib <- Consumer
.consumeWith[Environment, String, String](
subscription,
settings("group3", "client3")
) { (key, value) =>
(for {
messagesSoFar <- messagesReceived.update(_ :+ (key -> value))
_ <- Task.when(messagesSoFar.size == nrMessages)(done.succeed(()))
} yield ()).orDie
}
.fork
_ <- done.await *> ZIO.sleep(3.seconds) // TODO the sleep is necessary for the outstanding commits to be flushed. Maybe we can fix that another way
_ <- fib.interrupt
_ <- fib.join.ignore
_ <- produceOne(topic, "key-new", "msg-new")
newMessage <- Consumer.make[String, String](settings("group3", "client3")).use { c =>
c.subscribe(subscription) *> c.plainStream
.take(1)
.flattenChunks
.map(r => (r.record.key(), r.record.value()))
.run(ZSink.collectAll[(String, String)])
.map(_.head)
}
consumedMessages <- messagesReceived.get
} yield consumedMessages shouldNot contain(newMessage)
}

"fail when the consuming effect produces a failure" in unsafeRun {
val topic = "consumeWith3"
val subscription = Subscription.Topics(Set(topic))
val nrMessages = 10
val messages = (1 to nrMessages).toList.map(i => (s"key$i", s"msg$i"))

for {
messagesReceived <- Ref.make(List.empty[(String, String)])
_ <- produceMany(topic, messages)
fib <- Consumer
.consumeWith[Environment, String, String](
subscription,
settings("group3", "client3")
) { (key, value) =>
(for {
messagesSoFar <- messagesReceived.update(_ :+ (key -> value))
_ <- Task.when(messagesSoFar.size == 3)(
ZIO.die(new IllegalArgumentException("consumeWith failure"))
)
} yield ()).orDie
}
.fork
testResult <- fib.map(_ => fail("Expected consumeWith to fail")).orElse(Fiber.succeed(succeed)).join
} yield testResult
}
}
}
}
6 changes: 4 additions & 2 deletions src/test/scala/zio/kafka/client/KafkaTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ object KafkaTestUtils {
EmbeddedKafka.publishToKafka(t, k, m)
}

def produceMany(t: String, kvs: List[(String, String)]): UIO[Unit] =
UIO.foreach(kvs)(i => produceOne(t, i._1, i._2)).unit
def produceMany(t: String, kvs: List[(String, String)]): UIO[Unit] = ZIO.effectTotal {
import net.manub.embeddedkafka.Codecs._
EmbeddedKafka.publishToKafka(t, kvs)
}

def produceMany(topic: String, partition: Int, kvs: List[(String, String)]) =
ZIO.effectTotal {
Expand Down