diff --git a/docs/annotatedTests.md b/docs/annotatedTests.md new file mode 100644 index 000000000..ac6e0c44d --- /dev/null +++ b/docs/annotatedTests.md @@ -0,0 +1,146 @@ +# Annotated Tests + +By way of examples, we present some annotated tests from the test suite. + +Tests are written using zio-test. + +# General Notes + +The tests make use of KafkaTestUtils.scala which comprises a number of helper methods +for testing zio-kafka. You may find it useful to copy this file into your own test +folder for writing your kafka-based tests (there is no zio-test-utils project +at present). Relevant portions of the KafkaTestUtils will be introduced as we work +through the tests. + +# First Producer Test +```scala +object ProducerTest + extends DefaultRunnableSpec( + suite("consumer test suite")( + testM("one record") { + withProducerStrings { producer => + for { + _ <- producer.produce(new ProducerRecord("topic", "boo", "baa")) + } yield assertCompletes + } + } + ).provideManagedShared(KafkaTestUtils.embeddedKafkaEnvironment) + ) +``` + +First note the .provideManagedShared. This gives the tests a Kafka.Service +added to a full TestEnvironment (this is needed because we want to provide both +Live clock and the Kafka service) + +### Kafka.Service + +This follows the module pattern (see main zio docs) +```scala +object Kafka { + trait Service { + def bootstrapServers: List[String] + def stop(): UIO[Unit] + } + case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends Kafka.Service { + override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}") + override def stop(): UIO[Unit] = ZIO.effectTotal(embeddedK.stop(true)) + } + + case object DefaultLocal extends Kafka.Service { + override def bootstrapServers: List[String] = List(s"localhost:9092") + + override def stop(): UIO[Unit] = UIO.unit + } + + val makeEmbedded: Managed[Nothing, Kafka] = + ZManaged.make(ZIO.effectTotal(new Kafka { + override val kafka: Service = EmbeddedKafkaService(EmbeddedKafka.start()) + }))(_.kafka.stop()) + + val makeLocal: Managed[Nothing, Kafka] = + ZManaged.make(ZIO.effectTotal(new Kafka { + override val kafka: Service = DefaultLocal + }))(_.kafka.stop()) +``` + +In fact there are 2 provided implementations of service. The first is for the unit +tests and makes use of [EmbeddedKafka](https://github.com/embeddedkafka/embedded-kafka) + +The second uses the default local port and is suitable for a stand-alone kafka +(I used docker installation). You could create your own Kafka.Service for testing +against remove servers (but security would need to be added). + +Note the use of ZManaged to ensure the service is also stopped. + +### KafkaTestEnvironment + +```scala +object KafkaTestUtils { + + def kafkaEnvironment(kafkaE: Managed[Nothing, Kafka]): Managed[Nothing, KafkaTestEnvironment] = + for { + testEnvironment <- TestEnvironment.Value + kafkaS <- kafkaE + } yield new TestEnvironment( + testEnvironment.blocking, + testEnvironment.clock, + testEnvironment.console, + testEnvironment.live, + testEnvironment.random, + testEnvironment.sized, + testEnvironment.system + ) with Kafka { + val kafka = kafkaS.kafka + } + + val embeddedKafkaEnvironment: Managed[Nothing, KafkaTestEnvironment] = + kafkaEnvironment(Kafka.makeEmbedded) +``` + +## Back to the producer +The producer function is wrapped in a withProducerStrings: + +```scala + def producerSettings = + for { + servers <- ZIO.access[Kafka](_.kafka.bootstrapServers) + } yield ProducerSettings( + servers, + 5.seconds, + Map.empty + ) + + def withProducer[A, K, V]( + r: Producer[Any, K, V] => RIO[Any with Clock with Kafka with Blocking, A], + kSerde: Serde[Any, K], + vSerde: Serde[Any, V] + ): RIO[KafkaTestEnvironment, A] = + for { + settings <- producerSettings + producer = Producer.make(settings, kSerde, vSerde) + lcb <- Kafka.liveClockBlocking + produced <- producer.use { p => + r(p).provide(lcb) + } + } yield produced + + def withProducerStrings[A](r: Producer[Any, String, String] => RIO[Any with Clock with Kafka with Blocking, A]) = + withProducer(r, Serde.string, Serde.string) +``` +withProducerStrings simply wraps withProducer with the (String, String) type + +withProducer creates settings and a ZManaged\[Producer\]. It then creates liveClockBlocking - a +zio environment with the Live clock (which is essential when running code that +uses scheduling or other timing features - as does much of zio-kafka) + +The actual poroducer operation function is simply wrapped in the producer.use +```scala + _ <- producer.produce(new ProducerRecord("topic", "boo", "baa")) +``` +producer.produce takes a ProducerRecord (defined in the java kafka client on which +this library ios based). In this case the topic is "topic" and the key and value +boo and baa + + + + diff --git a/src/main/scala/zio/kafka/client/AdminClient.scala b/src/main/scala/zio/kafka/client/AdminClient.scala new file mode 100644 index 000000000..c44e52e3a --- /dev/null +++ b/src/main/scala/zio/kafka/client/AdminClient.scala @@ -0,0 +1,202 @@ +package zio.kafka.client + +import org.apache.kafka.clients.admin.{ + AdminClient => JAdminClient, + NewTopic => JNewTopic, + NewPartitions => JNewPartitions, + TopicDescription => JTopicDescription, + _ +} +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.{ KafkaFuture, TopicPartitionInfo } +import zio._ + +import scala.collection.JavaConverters._ + +/** + * Thin wrapper around apache java AdminClient. See java api for descriptions + * @param adminClient + * @param semaphore + */ +case class AdminClient(private val adminClient: JAdminClient, private val semaphore: Semaphore) { + import AdminClient._ + + /** + * Create multiple topics. + */ + def createTopics( + newTopics: Iterable[NewTopic], + createTopicOptions: Option[CreateTopicsOptions] = None + ): BlockingTask[Unit] = { + val asJava = newTopics.map(_.asJava).asJavaCollection + semaphore.withPermit( + fromKafkaFutureVoid { + blocking + .effectBlocking( + createTopicOptions + .fold(adminClient.createTopics(asJava))(opts => adminClient.createTopics(asJava, opts)) + .all() + ) + } + ) + } + + /** + * Create a single topic. + */ + def createTopic(newTopic: NewTopic, validateOnly: Boolean = false): BlockingTask[Unit] = + createTopics(List(newTopic), Some(new CreateTopicsOptions().validateOnly(validateOnly))) + + /** + * Delete multiple topics. + */ + def deleteTopics( + topics: Iterable[String], + deleteTopicsOptions: Option[DeleteTopicsOptions] = None + ): BlockingTask[Unit] = { + val asJava = topics.asJavaCollection + + semaphore.withPermit { + fromKafkaFutureVoid { + blocking + .effectBlocking( + deleteTopicsOptions + .fold(adminClient.deleteTopics(asJava))(opts => adminClient.deleteTopics(asJava, opts)) + .all() + ) + } + } + } + + /** + * Delete a single topic. + */ + def deleteTopic(topic: String): BlockingTask[Unit] = + deleteTopics(List(topic)) + + /** + * List the topics in the cluster. + */ + def listTopics(listTopicsOptions: Option[ListTopicsOptions] = None): BlockingTask[Map[String, TopicListing]] = + semaphore.withPermit { + fromKafkaFuture { + blocking.effectBlocking( + listTopicsOptions.fold(adminClient.listTopics())(opts => adminClient.listTopics(opts)).namesToListings() + ) + }.map(_.asScala.toMap) + } + + /** + * Describe the specified topics. + */ + def describeTopics( + topicNames: Iterable[String], + describeTopicsOptions: Option[DescribeTopicsOptions] = None + ): BlockingTask[Map[String, TopicDescription]] = { + val asJava = topicNames.asJavaCollection + semaphore.withPermit { + fromKafkaFuture { + blocking.effectBlocking( + describeTopicsOptions + .fold(adminClient.describeTopics(asJava))(opts => adminClient.describeTopics(asJava, opts)) + .all() + ) + }.map(_.asScala.mapValues(AdminClient.TopicDescription(_)).toMap) + } + } + + /** + * Add new partitions to a topic. + */ + def createPartitions( + newPartitions: Map[String, NewPartitions], + createPartitionsOptions: Option[CreatePartitionsOptions] = None + ): BlockingTask[Unit] = { + val asJava = newPartitions.mapValues(_.asJava).asJava + + semaphore.withPermit { + fromKafkaFutureVoid { + blocking.effectBlocking( + createPartitionsOptions + .fold(adminClient.createPartitions(asJava))(opts => adminClient.createPartitions(asJava, opts)) + .all() + ) + } + } + } +} + +object AdminClient { + def fromKafkaFuture[R, T](kfv: RIO[R, KafkaFuture[T]]): RIO[R, T] = + kfv.flatMap { f => + Task.effectAsync[T] { cb => + f.whenComplete { + new KafkaFuture.BiConsumer[T, Throwable] { + def accept(t: T, e: Throwable) = + if (e ne null) cb(Task.fail(e)) + else cb(Task.succeed(t)) + } + } + () + } + } + + def fromKafkaFutureVoid[R](kfv: RIO[R, KafkaFuture[Void]]): RIO[R, Unit] = + fromKafkaFuture(kfv).unit + + case class NewTopic( + name: String, + numPartitions: Int, + replicationFactor: Short, + configs: Map[String, String] = Map() + ) { + def asJava: JNewTopic = { + val jn = new JNewTopic(name, numPartitions, replicationFactor) + + if (configs.nonEmpty) + jn.configs(configs.asJava) + + jn + } + } + + case class NewPartitions( + totalCount: Int, + newAssignments: List[List[Int]] = Nil + ) { + def asJava = + if (newAssignments.nonEmpty) + JNewPartitions.increaseTo(totalCount, newAssignments.map(_.map(Int.box).asJava).asJava) + else JNewPartitions.increaseTo(totalCount) + + } + + case class TopicDescription( + name: String, + internal: Boolean, + partitions: List[TopicPartitionInfo], + authorizedOperations: Set[AclOperation] + ) + + object TopicDescription { + def apply(jt: JTopicDescription): TopicDescription = + TopicDescription(jt.name, jt.isInternal, jt.partitions.asScala.toList, jt.authorizedOperations.asScala.toSet) + } + + case class KafkaAdminClientConfig( + bootstrapServers: List[String], + additionalConfig: Map[String, AnyRef] = Map.empty + ) + + def make(config: KafkaAdminClientConfig) = + ZManaged.make { + val configMap = (config.additionalConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.bootstrapServers + .mkString(","))).asJava + for { + ac <- ZIO(JAdminClient.create(configMap)) + sem <- Semaphore.make(1L) + } yield AdminClient(ac, sem) + } { client => + ZIO.effectTotal(client.adminClient.close()) + } +} diff --git a/src/main/scala/zio/kafka/client/Consumer.scala b/src/main/scala/zio/kafka/client/Consumer.scala index 4b0b3e6de..74ce73dd6 100644 --- a/src/main/scala/zio/kafka/client/Consumer.scala +++ b/src/main/scala/zio/kafka/client/Consumer.scala @@ -16,7 +16,8 @@ class Consumer private ( private val consumer: ConsumerAccess, private val settings: ConsumerSettings, private val runloop: Runloop -) { self => +) { + self => def assignment: BlockingTask[Set[TopicPartition]] = consumer.withConsumer(_.assignment().asScala.toSet) @@ -30,7 +31,10 @@ class Consumer private ( partitions: Set[TopicPartition], timeout: Duration = Duration.Infinity ): BlockingTask[Map[TopicPartition, Long]] = - consumer.withConsumer(_.endOffsets(partitions.asJava, timeout.asJava).asScala.mapValues(_.longValue()).toMap) + consumer.withConsumer { eo => + val offs = eo.endOffsets(partitions.asJava, timeout.asJava) + offs.asScala.mapValues(_.longValue()).toMap + } /** * Stops consumption of data, drains buffered records, and ends the attached diff --git a/src/test/scala/zio/kafka/client/ClientAdminTest.scala b/src/test/scala/zio/kafka/client/ClientAdminTest.scala new file mode 100644 index 000000000..691484380 --- /dev/null +++ b/src/test/scala/zio/kafka/client/ClientAdminTest.scala @@ -0,0 +1,73 @@ +package zio.kafka.client + +import zio.test._ +import zio.test.Assertion._ +import zio.test.TestAspect._ + +import ClientAdminTestHelper._ + +object ClientAdminTest + extends DefaultRunnableSpec( + suite("client admin test")( + List(singleTopic, multiTopics, describeTopics): _* + ).provideManagedShared(KafkaTestUtils.embeddedKafkaEnvironment) @@ sequential + ) + +object ClientAdminTestHelper { + val singleTopic = testM("create, list, delete single topic") { + KafkaTestUtils.withAdmin { client => + for { + list1 <- client.listTopics() + _ <- client.createTopic(AdminClient.NewTopic("topic1", 1, 1)) + list2 <- client.listTopics() + _ <- client.deleteTopic("topic1") + list3 <- client.listTopics() + } yield assert(list1.size, equalTo(0)) && + assert(list2.size, equalTo(1)) && + assert(list3.size, equalTo(0)) + + } + } + val multiTopics = testM("create, list, delete multiple topic") { + KafkaTestUtils.withAdmin { client => + for { + list1 <- client.listTopics() + _ <- client.createTopics(List(AdminClient.NewTopic("topic2", 1, 1), AdminClient.NewTopic("topic3", 4, 1))) + list2 <- client.listTopics() + _ <- client.deleteTopic("topic2") + list3 <- client.listTopics() + _ <- client.deleteTopic("topic3") + list4 <- client.listTopics() + } yield assert(list1.size, equalTo(0)) && + assert(list2.size, equalTo(2)) && + assert(list3.size, equalTo(1)) && + assert(list4.size, equalTo(0)) + + } + } + + val listOnly = testM("just list") { + KafkaTestUtils.withAdmin { client => + for { + list1 <- client.listTopics() + } yield assert(list1.size, equalTo(0)) + + } + } + + val describeTopics = testM("create, describe, delete multiple topic") { + KafkaTestUtils.withAdmin { client => + for { + list1 <- client.listTopics() + _ <- client.createTopics(List(AdminClient.NewTopic("topic4", 1, 1), AdminClient.NewTopic("topic5", 4, 1))) + descriptions <- client.describeTopics(List("topic4", "topic5")) + _ <- client.deleteTopics(List("topic4", "topic5")) + list3 <- client.listTopics() + } yield assert(list1.size, equalTo(0)) && + assert(descriptions.size, equalTo(2)) && + assert(list3.size, equalTo(0)) + + } + } + +} diff --git a/src/test/scala/zio/kafka/client/ConsumerTest.scala b/src/test/scala/zio/kafka/client/ConsumerTest.scala index d3c444d10..f66c7dc08 100644 --- a/src/test/scala/zio/kafka/client/ConsumerTest.scala +++ b/src/test/scala/zio/kafka/client/ConsumerTest.scala @@ -162,5 +162,5 @@ object ConsumerTest } } yield assert(records, isEmpty) } - ).provideManagedShared(KafkaTestUtils.kafkaEnvironment) + ).provideManagedShared(KafkaTestUtils.embeddedKafkaEnvironment) ) diff --git a/src/test/scala/zio/kafka/client/ConsumerTest2.scala b/src/test/scala/zio/kafka/client/ConsumerTest2.scala index 6ead586dc..cb437a477 100644 --- a/src/test/scala/zio/kafka/client/ConsumerTest2.scala +++ b/src/test/scala/zio/kafka/client/ConsumerTest2.scala @@ -48,7 +48,7 @@ object ConsumerTest2 consumedMessages <- messagesReceived.get } yield assert(consumedMessages, contains(newMessage).negate) } - ).provideManagedShared(KafkaTestUtils.kafkaEnvironment) + ).provideManagedShared(KafkaTestUtils.embeddedKafkaEnvironment) ) object TestHelper { diff --git a/src/test/scala/zio/kafka/client/EmbeddedMultiConsumerTest.scala b/src/test/scala/zio/kafka/client/EmbeddedMultiConsumerTest.scala new file mode 100644 index 000000000..a6e278997 --- /dev/null +++ b/src/test/scala/zio/kafka/client/EmbeddedMultiConsumerTest.scala @@ -0,0 +1,11 @@ +package zio.kafka.client + +import zio.test.{ suite, DefaultRunnableSpec } +import MultiConsumerTestHelper._ + +object EmbeddedMultiConsumerTest + extends DefaultRunnableSpec( + suite("consumer test suite3 - multiple consumers")( + List(testMultipleConsumers, testSingleConsumerManyRecords): _* + ).provideManagedShared(KafkaTestUtils.embeddedKafkaEnvironment) + ) diff --git a/src/test/scala/zio/kafka/client/KafkaTestUtils.scala b/src/test/scala/zio/kafka/client/KafkaTestUtils.scala index b64baf829..a571111b5 100644 --- a/src/test/scala/zio/kafka/client/KafkaTestUtils.scala +++ b/src/test/scala/zio/kafka/client/KafkaTestUtils.scala @@ -8,7 +8,9 @@ import zio.blocking.Blocking import zio.clock.Clock import zio.kafka.client.serde.Serde import zio.duration._ +import zio.kafka.client.AdminClient.KafkaAdminClientConfig import zio.kafka.client.Kafka.KafkaTestEnvironment +import zio.random.Random import zio.test.environment.{ Live, TestEnvironment } trait Kafka { @@ -23,15 +25,25 @@ object Kafka { case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends Kafka.Service { override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}") + override def stop(): UIO[Unit] = ZIO.effectTotal(embeddedK.stop(true)) + } + + case object DefaultLocal extends Kafka.Service { + override def bootstrapServers: List[String] = List(s"localhost:9092") - override def stop(): UIO[Unit] = ZIO.effectTotal(embeddedK.stop(true)) + override def stop(): UIO[Unit] = UIO.unit } - val make: Managed[Nothing, Kafka] = + val makeEmbedded: Managed[Nothing, Kafka] = ZManaged.make(ZIO.effectTotal(new Kafka { override val kafka: Service = EmbeddedKafkaService(EmbeddedKafka.start()) }))(_.kafka.stop()) + val makeLocal: Managed[Nothing, Kafka] = + ZManaged.make(ZIO.effectTotal(new Kafka { + override val kafka: Service = DefaultLocal + }))(_.kafka.stop()) + type KafkaTestEnvironment = Kafka with TestEnvironment type KafkaClockBlocking = Kafka with Clock with Blocking @@ -42,7 +54,7 @@ object Kafka { blcking <- ZIO.environment[Blocking] kfka <- ZIO.environment[Kafka] } yield new Kafka with Clock with Blocking { - override def kafka: Service = kfka.kafka + override val kafka: Service = kfka.kafka override val clock: Clock.Service[Any] = clck.clock override val blocking: Blocking.Service[Any] = blcking.blocking @@ -52,10 +64,10 @@ object Kafka { object KafkaTestUtils { - val kafkaEnvironment: Managed[Nothing, KafkaTestEnvironment] = + def kafkaEnvironment(kafkaE: Managed[Nothing, Kafka]): Managed[Nothing, KafkaTestEnvironment] = for { testEnvironment <- TestEnvironment.Value - kafkaService <- Kafka.make + kafkaS <- kafkaE } yield new TestEnvironment( testEnvironment.blocking, testEnvironment.clock, @@ -65,9 +77,15 @@ object KafkaTestUtils { testEnvironment.sized, testEnvironment.system ) with Kafka { - val kafka = kafkaService.kafka + val kafka = kafkaS.kafka } + val embeddedKafkaEnvironment: Managed[Nothing, KafkaTestEnvironment] = + kafkaEnvironment(Kafka.makeEmbedded) + + val localKafkaEnvironment: Managed[Nothing, KafkaTestEnvironment] = + kafkaEnvironment(Kafka.makeLocal) + def producerSettings = for { servers <- ZIO.access[Kafka](_.kafka.bootstrapServers) @@ -99,7 +117,7 @@ object KafkaTestUtils { p.produce(new ProducerRecord(t, k, m)) } - def produceMany(t: String, kvs: List[(String, String)]) = + def produceMany(t: String, kvs: Iterable[(String, String)]) = withProducerStrings { p => val records = kvs.map { case (k, v) => new ProducerRecord[String, String](t, k, v) @@ -108,7 +126,7 @@ object KafkaTestUtils { p.produceChunk(chunk) } - def produceMany(topic: String, partition: Int, kvs: List[(String, String)]) = + def produceMany(topic: String, partition: Int, kvs: Iterable[(String, String)]) = withProducerStrings { p => val records = kvs.map { case (k, v) => new ProducerRecord[String, String](topic, partition, null, k, v) @@ -160,4 +178,34 @@ object KafkaTestUtils { } yield consumed).provide(lcb) } yield inner + def adminSettings = + for { + servers <- ZIO.access[Kafka](_.kafka.bootstrapServers) + } yield KafkaAdminClientConfig(servers) + + def withAdmin[T](f: AdminClient => RIO[Any with Clock with Kafka with Blocking, T]) = + for { + settings <- adminSettings + lcb <- Kafka.liveClockBlocking + fRes <- AdminClient + .make(settings) + .use { client => + f(client) + } + .provide(lcb) + } yield fRes + + // temporary workaround for zio issue #2166 - broken infinity + val veryLongTime = Duration.fromNanos(Long.MaxValue) + + def randomThing(prefix: String) = + for { + random <- ZIO.environment[Random] + l <- random.random.nextLong(8) + } yield s"$prefix-$l" + + def randomTopic = randomThing("topic") + + def randomGroup = randomThing("group") + } diff --git a/src/test/scala/zio/kafka/client/LocalMultiConsumerTest.scala b/src/test/scala/zio/kafka/client/LocalMultiConsumerTest.scala new file mode 100644 index 000000000..dc2f6c610 --- /dev/null +++ b/src/test/scala/zio/kafka/client/LocalMultiConsumerTest.scala @@ -0,0 +1,13 @@ +package zio.kafka.client + +import zio.test.{ suite, DefaultRunnableSpec } + +/** + * simple example of trivial change to consumer test to test against local running kafka. Left in because it might + * be useful, empty because otherwise it would fail CI + */ +object LocalMultiConsumerTest + extends DefaultRunnableSpec( + suite("consumer test suite3 - parallel consumers local running kafka (empty to pass ci)")( + ).provideManagedShared(KafkaTestUtils.localKafkaEnvironment) + ) diff --git a/src/test/scala/zio/kafka/client/MultiConsumerTestHelper.scala b/src/test/scala/zio/kafka/client/MultiConsumerTestHelper.scala new file mode 100644 index 000000000..5ddf09fd1 --- /dev/null +++ b/src/test/scala/zio/kafka/client/MultiConsumerTestHelper.scala @@ -0,0 +1,81 @@ +package zio.kafka.client + +import zio.test._ +import zio._ +import zio.kafka.client.serde.Serde + +object MultiConsumerTestHelper { + import KafkaTestUtils._ + + def makeTopic(name: String, nParts: Int) = withAdmin { admin => + admin.createTopic(AdminClient.NewTopic(name, nParts, 1)) + } + + case class UsefulInfo(consumerIndex: Int, topic: String, partition: Int, offset: Long, key: String, value: String) + + def consumeN(topic: String, groupId: String, consumerIndex: Int, nTakes: Int) = + withConsumer(groupId, "client1") { consumer => + for { + data <- consumer + .subscribeAnd(Subscription.Topics(Set(topic))) + .partitionedStream(Serde.string, Serde.string) + .flatMap(_._2.flattenChunks) + .take(nTakes) + .runCollect + .map( + x => + x.map { item => + UsefulInfo( + consumerIndex, + item.record.topic, + item.record.partition, + item.offset.offset, + item.record.key, + item.record.value + ) + } + ) + } yield data + } + + def makeMany(topic: String, howMany: Int) = { + val many = 1.to(howMany).map { i => + val k = i // % 8 + (s"key-$k", s"value-$i") + } + produceMany(topic, many) + } + + val testMultipleConsumers = testM("test multiple consumers") { + for { + topic <- randomTopic + consumerGroupId <- randomGroup + _ <- makeTopic(topic, 5) + _ <- makeMany(topic, 1000) + consumed = 0.to(4).map(i => MultiConsumerTestHelper.consumeN(topic, consumerGroupId, i, 3)) + _ <- ZIO.collectAll(consumed) + } yield assertCompletes + + } + val testParallelConsumers = testM("test parallel consumers") { + for { + topic <- randomTopic + consumerGroupId <- randomGroup + _ <- makeTopic(topic, 5) + _ <- makeMany(topic, 1000) + consumed = 0.to(4).map(i => MultiConsumerTestHelper.consumeN(topic, consumerGroupId, i, 3)) + _ <- ZIO.collectAllPar(consumed) + } yield assertCompletes + + } + val testSingleConsumerManyRecords = testM("test lots of stuff") { + for { + topic <- randomTopic + consumerGroupId <- randomGroup + _ <- makeMany(topic, 100000) + _ <- MultiConsumerTestHelper.consumeN(topic, consumerGroupId, 0, 100000) + } yield assertCompletes + + } + +} diff --git a/src/test/scala/zio/kafka/client/ProducerTest.scala b/src/test/scala/zio/kafka/client/ProducerTest.scala index dfe846ffb..09c8a0257 100644 --- a/src/test/scala/zio/kafka/client/ProducerTest.scala +++ b/src/test/scala/zio/kafka/client/ProducerTest.scala @@ -67,5 +67,5 @@ object ProducerTest } yield assert(outcome.length, equalTo(0)) } } - ).provideManagedShared(KafkaTestUtils.kafkaEnvironment) + ).provideManagedShared(KafkaTestUtils.embeddedKafkaEnvironment) )