Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High-level API for consuming messages with an effect #22

Merged
merged 11 commits into from
Sep 28, 2019
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ developers := List(
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")

libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "1.0.0-RC12-1",
"dev.zio" %% "zio-test" % "1.0.0-RC12-1" % "test",
"dev.zio" %% "zio-test-sbt" % "1.0.0-RC12-1" % "test",
"dev.zio" %% "zio-streams" % "1.0.0-RC13",
"dev.zio" %% "zio-test" % "1.0.0-RC13" % "test",
"dev.zio" %% "zio-test-sbt" % "1.0.0-RC13" % "test",
"org.apache.kafka" % "kafka-clients" % "2.3.0",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"io.github.embeddedkafka" %% "embedded-kafka" % "2.3.0" % "test",
Expand Down
51 changes: 51 additions & 0 deletions src/main/scala/zio/kafka/client/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import zio.blocking.Blocking
import zio._
import zio.stream._
import zio.clock.Clock
import zio.console._

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -47,4 +48,54 @@ object Consumer {
)
runloop <- Runloop(deps)
} yield new Consumer(wrapper, settings, runloop)

/**
* Execute an effect for each record and commit the offset after processing
*
* This method is the easiest way of processing messages on a Kafka topic.
*
* Messages on a single partition are processed sequentially, while the processing of
* multiple partitions happens in parallel.
*
* Messages are processed with 'at least once' consistency: it is not guaranteed that every message
* that is processed by the effect has a corresponding offset commit before stream termination.
*
* Offsets are batched and committed after execution of the effect.
*
* The effect should must absorb any failures. Failures should be handled by retries or ignoring the
* error, which will result in the Kafka message being skipped.
*
* Usage example:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example would be nice, yeah!

*
* @param settings Settings for creating a [[Consumer]]
* @param subscription Topic subscription parameters
* @param f Function that returns the effect to execute for each message. It is passed the key and value
* @tparam K Type of keys (an implicit [[Serde]] should be in scope)
* @tparam V Type of values (an implicit [[Serde]] should be in scope)
* @return Effect that completes with a unit value only when interrupted. May fail when the [[Consumer]] fails.
*/
def consumeM[R, K: Serde, V: Serde](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a version of this that takes a (K, V) => Unit, and we probably won't have. So let's call this consumeWith.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not happy about the R type parameter here, but since K and V need to be specified, I don't see how to get rid of R.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the consuming function could require any R, so it is more correct to be polymorphic in the dependencies used (in addition to Blocking and Clock).

subscription: Subscription,
settings: ConsumerSettings
)(f: (K, V) => ZIO[R, Nothing, Unit]): ZIO[R with Clock with Blocking with Console, Throwable, Unit] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
)(f: (K, V) => ZIO[R, Nothing, Unit]): ZIO[R with Clock with Blocking with Console, Throwable, Unit] =
)(f: (K, V) => ZIO[R, Nothing, Unit]): ZIO[R with Clock with Blocking, Throwable, Unit] =

ZStream
.managed(Consumer.make[K, V](settings))
.flatMap { consumer =>
ZStream
.fromEffect(consumer.subscribe(subscription))
.flatMap { _ =>
consumer.partitioned
.flatMapPar(Int.MaxValue, outputBuffer = settings.perPartitionChunkPrefetch) {
case (partition @ _, partitionStream) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case (partition @ _, partitionStream) =>
case (_, partitionStream) =>

partitionStream.mapM {
case CommittableRecord(record, offset) =>
f(record.key(), record.value()).as(offset)
}.flattenChunks
}
}
}
.aggregate(ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _))
.mapM(_.commit)
.tap(_ => putStrLn(s"Commit done"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No printlns :-)

.run(ZSink.drain)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.run(ZSink.drain)
.runDrain

}
2 changes: 1 addition & 1 deletion src/main/scala/zio/kafka/client/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ object Runloop {
deps.requests,
deps.commits
)
.fold(State.initial[K, V])(_ => true) { (state, cmd) =>
.foldWhileM(State.initial[K, V])(_ => true) { (state, cmd) =>
def doCommit(cmds: List[Command.Commit[K, V]]) =
for {
runtime <- ZIO.runtime[Any]
Expand Down