Skip to content

Commit

Permalink
Add a README and rename the streaming methods (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid authored Sep 27, 2019
1 parent 0d6d55b commit 9fc3d00
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 7 deletions.
92 changes: 92 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
[![Release Artifacts][Badge-SonatypeReleases]][Link-SonatypeReleases]

# Welcome to ZIO Kafka

ZIO Kafka provides a purely functional, streams-based to the Kafka
client. It integrates effortlessly with ZIO and ZIO Streams.

## Quickstart

Add the following dependencies to your `build.sbt` file:
```scala
libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "1.0.0-RC13",
"dev.zio" %% "zio-kafka" % "<version>"
)
```

Somewhere in your application, configure the `zio.kafka.ConsumerSettings`
data type:
```scala
import zio._, zio.duration._
import zio.kafka.client._

val consumerSettings: ConsumerSettings =
ConsumerSettings(
bootstrapServers = List("localhost:9092"),
groupId = "group",
clientId = "client",
closeTimeout = 30.seconds,
extraDriverSettings = Map(),
pollInterval = 250.millis,
pollTimeout = 50.millis,
perPartitionChunkPrefetch = 2
)
```

And use it to create a consumer:
```scala
import zio.blocking.Blocking, zio.clock.Clock
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde

val consumer: ZManaged[Clock with Blocking, Throwable, Consumer[Array[Byte], Array[Byte]]] =
Consumer.make(consumerSettings)(new ByteArraySerde, new ByteArraySerde)
```

ZIO Kafka currently relies on an implicit
`org.apache.kafka.common.serialization.Serde` being available for both
the key type and value type of the messages consumed. This will change
in future relies in favour of a more explicit approach that does not
rely on the Kafka client's built-in de/serialization mechanism.

The consumer returned from `Consumer.make` is wrapped in a `ZManaged`
to ensure its proper release. To get access to it, use the `ZManaged#use` method:
```scala
consumer.use { c =>
// Consumer now available as `c`
}
```

You may stream data from Kafka using the `subscribe` and
`plainStream` methods:
```scala
import zio.console.putStrLn
import zio.stream._

consumer.use { c =>
val records =
ZStream.unwrap(
c.subscribe(Subscription.Topics(Set("topic")))
.as(c.plainStream.tap(record => putStrLn(record.toString)).chunks)
)

records.runDrain
}
```

If you need to distinguish between the different partitions assigned
to the consumer, you may use the `Consumer#partitionedStream` method,
which creates a nested stream of partitions:
``` scala
def partitionedStream: ZStream[Clock with Blocking,
Throwable,
(TopicPartition, ZStreamChunk[Any, Throwable, CommittableRecord[K, V]])]
```

## Getting help

Please feel free to use the [Gitter](https://gitter.im/zio/zio-kafka)
channel for any and all questions you may have.

[Link-SonatypeReleases]: https://oss.sonatype.org/content/repositories/releases/dev/zio/zio-kafka_2.12/ "Sonatype Releases"
[Badge-SonatypeReleases]: https://img.shields.io/nexus/r/https/oss.sonatype.org/dev.zio/zio-kafka_2.12.svg "Sonatype Releases"
6 changes: 3 additions & 3 deletions src/main/scala/zio/kafka/client/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Consumer[K, V] private (
): BlockingTask[Map[TopicPartition, OffsetAndTimestamp]] =
consumer.withConsumer(_.offsetsForTimes(timestamps.mapValues(Long.box).asJava, timeout.asJava).asScala.toMap)

def partitioned
def partitionedStream
: ZStream[Clock with Blocking, Throwable, (TopicPartition, ZStreamChunk[Any, Throwable, CommittableRecord[K, V]])] =
ZStream
.fromQueue(runloop.deps.partitions)
Expand All @@ -61,8 +61,8 @@ class Consumer[K, V] private (
def position(partition: TopicPartition, timeout: Duration = Duration.Infinity): BlockingTask[Long] =
consumer.withConsumer(_.position(partition, timeout.asJava))

def plain: ZStreamChunk[Clock with Blocking, Throwable, CommittableRecord[K, V]] =
ZStreamChunk(partitioned.flatMapPar(Int.MaxValue)(_._2.chunks))
def plainStream: ZStreamChunk[Clock with Blocking, Throwable, CommittableRecord[K, V]] =
ZStreamChunk(partitionedStream.flatMapPar(Int.MaxValue)(_._2.chunks))

def seek(partition: TopicPartition, offset: Long): BlockingTask[Unit] =
consumer.withConsumer(_.seek(partition, offset))
Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/zio/kafka/client/ConsumerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa
_ <- consumer.subscribe(Subscription.Topics(Set("topic150")))
kvs <- ZIO((1 to 5).toList.map(i => (s"key$i", s"msg$i")))
_ <- produceMany("topic150", kvs)
records <- consumer.plain.flattenChunks.take(5).runCollect
records <- consumer.plainStream.flattenChunks.take(5).runCollect
_ <- ZIO.effectTotal(records.map { r =>
(r.record.key, r.record.value)
} shouldEqual kvs)
Expand All @@ -65,7 +65,7 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa
firstResults <- Consumer.make[String, String](settings("group1", "first")).use { consumer =>
for {
_ <- consumer.subscribe(Subscription.Topics(Set("topic1")))
results <- consumer.partitioned
results <- consumer.partitionedStream
.filter(_._1 == new TopicPartition("topic1", 0))
.flatMap(_._2.flattenChunks)
.take(5)
Expand All @@ -84,7 +84,7 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa
secondResults <- Consumer.make[String, String](settings("group1", "second")).use { consumer =>
for {
_ <- consumer.subscribe(Subscription.Topics(Set("topic1")))
results <- consumer.partitioned
results <- consumer.partitionedStream
.flatMap(_._2.flattenChunks)
.take(5)
.transduce(ZSink.collectAll[CommittableRecord[String, String]])
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/zio/kafka/client/ProducerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ProducerTest extends WordSpecLike with Matchers with LazyLogging with Defa
1
)
)
.flatMap(c => c.subscribe(subscription).toManaged_ *> c.plain.toQueue())
.flatMap(c => c.subscribe(subscription).toManaged_ *> c.plainStream.toQueue())

for {
outcome <- producer.produceChunk(chunks).either
Expand Down

0 comments on commit 9fc3d00

Please sign in to comment.