-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
High-level API for consuming messages with an effect #22
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
.aggregate(ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _)) | ||
.mapM(_.commit) | ||
.tap(_ => putStrLn(s"Commit done")) | ||
.run(ZSink.drain) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.run(ZSink.drain) | |
.runDrain |
} | ||
.aggregate(ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ merge _)) | ||
.mapM(_.commit) | ||
.tap(_ => putStrLn(s"Commit done")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No printlns :-)
def consumeM[R, K: Serde, V: Serde]( | ||
subscription: Subscription, | ||
settings: ConsumerSettings | ||
)(f: (K, V) => ZIO[R, Nothing, Unit]): ZIO[R with Clock with Blocking with Console, Throwable, Unit] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
)(f: (K, V) => ZIO[R, Nothing, Unit]): ZIO[R with Clock with Blocking with Console, Throwable, Unit] = | |
)(f: (K, V) => ZIO[R, Nothing, Unit]): ZIO[R with Clock with Blocking, Throwable, Unit] = |
* The effect should must absorb any failures. Failures should be handled by retries or ignoring the | ||
* error, which will result in the Kafka message being skipped. | ||
* | ||
* Usage example: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An example would be nice, yeah!
* @tparam V Type of values (an implicit [[Serde]] should be in scope) | ||
* @return Effect that completes with a unit value only when interrupted. May fail when the [[Consumer]] fails. | ||
*/ | ||
def consumeM[R, K: Serde, V: Serde]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have a version of this that takes a (K, V) => Unit
, and we probably won't have. So let's call this consumeWith
.
.flatMap { _ => | ||
consumer.partitioned | ||
.flatMapPar(Int.MaxValue, outputBuffer = settings.perPartitionChunkPrefetch) { | ||
case (partition @ _, partitionStream) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case (partition @ _, partitionStream) => | |
case (_, partitionStream) => |
* @tparam V Type of values (an implicit [[Serde]] should be in scope) | ||
* @return Effect that completes with a unit value only when interrupted. May fail when the [[Consumer]] fails. | ||
*/ | ||
def consumeM[R, K: Serde, V: Serde]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not happy about the R
type parameter here, but since K
and V
need to be specified, I don't see how to get rid of R
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the consuming function could require any R
, so it is more correct to be polymorphic in the dependencies used (in addition to Blocking and Clock).
Ok, let's just add a test and this should be good to go! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @svroonland!
See docs. This provides a very easy, high-level API to apply an effect to each message, while benefitting from parallelism and automatic committing after processing.