From 9fc3d005a9f582fa10334b4d3f3f5a380fed4aba Mon Sep 17 00:00:00 2001 From: Itamar Ravid Date: Fri, 27 Sep 2019 23:17:40 +0300 Subject: [PATCH] Add a README and rename the streaming methods (#26) --- README.md | 92 +++++++++++++++++++ .../scala/zio/kafka/client/Consumer.scala | 6 +- .../scala/zio/kafka/client/ConsumerTest.scala | 6 +- .../scala/zio/kafka/client/ProducerTest.scala | 2 +- 4 files changed, 99 insertions(+), 7 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 000000000..32c5493cc --- /dev/null +++ b/README.md @@ -0,0 +1,92 @@ +[![Release Artifacts][Badge-SonatypeReleases]][Link-SonatypeReleases] + +# Welcome to ZIO Kafka + +ZIO Kafka provides a purely functional, streams-based to the Kafka +client. It integrates effortlessly with ZIO and ZIO Streams. + +## Quickstart + +Add the following dependencies to your `build.sbt` file: +```scala +libraryDependencies ++= Seq( + "dev.zio" %% "zio-streams" % "1.0.0-RC13", + "dev.zio" %% "zio-kafka" % "" +) +``` + +Somewhere in your application, configure the `zio.kafka.ConsumerSettings` +data type: +```scala +import zio._, zio.duration._ +import zio.kafka.client._ + +val consumerSettings: ConsumerSettings = + ConsumerSettings( + bootstrapServers = List("localhost:9092"), + groupId = "group", + clientId = "client", + closeTimeout = 30.seconds, + extraDriverSettings = Map(), + pollInterval = 250.millis, + pollTimeout = 50.millis, + perPartitionChunkPrefetch = 2 + ) +``` + +And use it to create a consumer: +```scala +import zio.blocking.Blocking, zio.clock.Clock +import org.apache.kafka.common.serialization.Serdes.ByteArraySerde + +val consumer: ZManaged[Clock with Blocking, Throwable, Consumer[Array[Byte], Array[Byte]]] = + Consumer.make(consumerSettings)(new ByteArraySerde, new ByteArraySerde) +``` + +ZIO Kafka currently relies on an implicit +`org.apache.kafka.common.serialization.Serde` being available for both +the key type and value type of the messages consumed. This will change +in future relies in favour of a more explicit approach that does not +rely on the Kafka client's built-in de/serialization mechanism. + +The consumer returned from `Consumer.make` is wrapped in a `ZManaged` +to ensure its proper release. To get access to it, use the `ZManaged#use` method: +```scala +consumer.use { c => + // Consumer now available as `c` +} +``` + +You may stream data from Kafka using the `subscribe` and +`plainStream` methods: +```scala +import zio.console.putStrLn +import zio.stream._ + +consumer.use { c => + val records = + ZStream.unwrap( + c.subscribe(Subscription.Topics(Set("topic"))) + .as(c.plainStream.tap(record => putStrLn(record.toString)).chunks) + ) + + records.runDrain +} +``` + +If you need to distinguish between the different partitions assigned +to the consumer, you may use the `Consumer#partitionedStream` method, +which creates a nested stream of partitions: +``` scala +def partitionedStream: ZStream[Clock with Blocking, + Throwable, + (TopicPartition, ZStreamChunk[Any, Throwable, CommittableRecord[K, V]])] +``` + +## Getting help + +Please feel free to use the [Gitter](https://gitter.im/zio/zio-kafka) +channel for any and all questions you may have. + +[Link-SonatypeReleases]: https://oss.sonatype.org/content/repositories/releases/dev/zio/zio-kafka_2.12/ "Sonatype Releases" +[Badge-SonatypeReleases]: https://img.shields.io/nexus/r/https/oss.sonatype.org/dev.zio/zio-kafka_2.12.svg "Sonatype Releases" diff --git a/src/main/scala/zio/kafka/client/Consumer.scala b/src/main/scala/zio/kafka/client/Consumer.scala index 890422b11..8517b116a 100644 --- a/src/main/scala/zio/kafka/client/Consumer.scala +++ b/src/main/scala/zio/kafka/client/Consumer.scala @@ -41,7 +41,7 @@ class Consumer[K, V] private ( ): BlockingTask[Map[TopicPartition, OffsetAndTimestamp]] = consumer.withConsumer(_.offsetsForTimes(timestamps.mapValues(Long.box).asJava, timeout.asJava).asScala.toMap) - def partitioned + def partitionedStream : ZStream[Clock with Blocking, Throwable, (TopicPartition, ZStreamChunk[Any, Throwable, CommittableRecord[K, V]])] = ZStream .fromQueue(runloop.deps.partitions) @@ -61,8 +61,8 @@ class Consumer[K, V] private ( def position(partition: TopicPartition, timeout: Duration = Duration.Infinity): BlockingTask[Long] = consumer.withConsumer(_.position(partition, timeout.asJava)) - def plain: ZStreamChunk[Clock with Blocking, Throwable, CommittableRecord[K, V]] = - ZStreamChunk(partitioned.flatMapPar(Int.MaxValue)(_._2.chunks)) + def plainStream: ZStreamChunk[Clock with Blocking, Throwable, CommittableRecord[K, V]] = + ZStreamChunk(partitionedStream.flatMapPar(Int.MaxValue)(_._2.chunks)) def seek(partition: TopicPartition, offset: Long): BlockingTask[Unit] = consumer.withConsumer(_.seek(partition, offset)) diff --git a/src/test/scala/zio/kafka/client/ConsumerTest.scala b/src/test/scala/zio/kafka/client/ConsumerTest.scala index 8155f4a1a..56e4e23f1 100644 --- a/src/test/scala/zio/kafka/client/ConsumerTest.scala +++ b/src/test/scala/zio/kafka/client/ConsumerTest.scala @@ -49,7 +49,7 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa _ <- consumer.subscribe(Subscription.Topics(Set("topic150"))) kvs <- ZIO((1 to 5).toList.map(i => (s"key$i", s"msg$i"))) _ <- produceMany("topic150", kvs) - records <- consumer.plain.flattenChunks.take(5).runCollect + records <- consumer.plainStream.flattenChunks.take(5).runCollect _ <- ZIO.effectTotal(records.map { r => (r.record.key, r.record.value) } shouldEqual kvs) @@ -65,7 +65,7 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa firstResults <- Consumer.make[String, String](settings("group1", "first")).use { consumer => for { _ <- consumer.subscribe(Subscription.Topics(Set("topic1"))) - results <- consumer.partitioned + results <- consumer.partitionedStream .filter(_._1 == new TopicPartition("topic1", 0)) .flatMap(_._2.flattenChunks) .take(5) @@ -84,7 +84,7 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa secondResults <- Consumer.make[String, String](settings("group1", "second")).use { consumer => for { _ <- consumer.subscribe(Subscription.Topics(Set("topic1"))) - results <- consumer.partitioned + results <- consumer.partitionedStream .flatMap(_._2.flattenChunks) .take(5) .transduce(ZSink.collectAll[CommittableRecord[String, String]]) diff --git a/src/test/scala/zio/kafka/client/ProducerTest.scala b/src/test/scala/zio/kafka/client/ProducerTest.scala index 30c0142d6..f1a77124b 100644 --- a/src/test/scala/zio/kafka/client/ProducerTest.scala +++ b/src/test/scala/zio/kafka/client/ProducerTest.scala @@ -72,7 +72,7 @@ class ProducerTest extends WordSpecLike with Matchers with LazyLogging with Defa 1 ) ) - .flatMap(c => c.subscribe(subscription).toManaged_ *> c.plain.toQueue()) + .flatMap(c => c.subscribe(subscription).toManaged_ *> c.plainStream.toQueue()) for { outcome <- producer.produceChunk(chunks).either