Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make tests faster #1139

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lazy val embeddedKafkaVersion = "3.6.1" // Should be the same as kafkaVersion, e

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.14"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.4.14"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand Down Expand Up @@ -166,11 +166,11 @@ lazy val zioKafkaExample =
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.20",
"dev.zio" %% "zio-kafka" % "2.7.1",
"dev.zio" %% "zio-kafka-testkit" % "2.7.1" % Test,
"dev.zio" %% "zio-test" % "2.0.20" % Test,
"ch.qos.logback" % "logback-classic" % "1.4.14",
"dev.zio" %% "zio-logging-slf4j2" % "2.1.16",
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion,
logback,
"dev.zio" %% "zio-kafka-testkit" % "2.7.1" % Test,
"dev.zio" %% "zio-test" % "2.0.20" % Test
),
// Scala 3 compiling fails with:
// [error] Modules were resolved with conflicting cross-version suffixes in ProjectRef(uri("file:/home/runner/work/zio-kafka/zio-kafka/"), "zioKafkaExample"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils.{ consumer, produceMany, producer }
import zio.kafka.testkit._
import zio.test.Assertion.hasSameElements
import zio.test.TestAspect.{ sequential, timeout }
import zio.test.TestAspect.timeout
import zio.test._

/**
Expand Down Expand Up @@ -41,5 +41,5 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
)
.provideSome[Kafka](producer) // Here, we provide a new instance of Producer per test
.provideSomeShared[Scope](Kafka.embedded) // Here, we provide an instance of Kafka for the entire suite
) @@ timeout(2.minutes) @@ sequential
) @@ timeout(2.minutes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils._
import zio.test.TestAspect.{ sequential, timeout }
import zio.test.TestAspect.timeout
import zio.test._

/**
Expand All @@ -25,5 +25,5 @@ object ProducerSpec extends ZIOSpecDefault {
)
.provideSome[Kafka](producer) // Here, we provide a new instance of Producer per test
.provideSomeShared[Scope](Kafka.embedded) // Here, we provide an instance of Kafka for the entire suite
) @@ timeout(2.minutes) @@ sequential
) @@ timeout(2.minutes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ object AdminSaslSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
assert(remainingAcls)(equalTo(Set.empty[AclBinding]))
}
}
).provideSomeShared[Scope](Kafka.saslEmbedded) @@ withLiveClock @@ sequential
).provideSomeShared[Scope](Kafka.saslEmbedded) @@ withLiveClock

}
84 changes: 39 additions & 45 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,91 +39,85 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

override val kafkaPrefix: String = "adminspec"

private def listTopicsFiltered(client: AdminClient): ZIO[Any, Throwable, Map[String, AdminClient.TopicListing]] =
client.listTopics().map(_.filter { case (key, _) => key.startsWith("adminspec-") })
private def listTopicsFiltered(
client: AdminClient,
prefix: String
): ZIO[Any, Throwable, Map[String, AdminClient.TopicListing]] =
client.listTopics().map(_.filter { case (key, _) => key.startsWith(prefix) })

override def spec: Spec[TestEnvironment with Scope, Throwable] =
suite("client admin test")(
test("create, list, delete single topic") {
val prefix = "adminspec1"
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
_ <- client.createTopic(AdminClient.NewTopic("adminspec-topic1", 1, 1))
list2 <- listTopicsFiltered(client)
_ <- client.deleteTopic("adminspec-topic1")
list3 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0)) &&
assert(list2.size)(equalTo(1)) &&
assert(list3.size)(equalTo(0))

list1 <- listTopicsFiltered(client, prefix)
_ <- client.createTopic(AdminClient.NewTopic(s"$prefix-topic1", 1, 1))
list2 <- listTopicsFiltered(client, prefix)
_ <- client.deleteTopic(s"$prefix-topic1")
list3 <- listTopicsFiltered(client, prefix)
} yield assertTrue(list1.isEmpty, list2.size == 1, list3.isEmpty)
}
},
test("create, list, delete multiple topic") {
val prefix = "adminspec2"
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
list1 <- listTopicsFiltered(client, prefix)
_ <- client.createTopics(
List(AdminClient.NewTopic("adminspec-topic2", 1, 1), AdminClient.NewTopic("adminspec-topic3", 4, 1))
List(AdminClient.NewTopic(s"$prefix-topic2", 1, 1), AdminClient.NewTopic(s"$prefix-topic3", 4, 1))
)
list2 <- listTopicsFiltered(client)
_ <- client.deleteTopic("adminspec-topic2")
list3 <- listTopicsFiltered(client)
_ <- client.deleteTopic("adminspec-topic3")
list4 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0)) &&
assert(list2.size)(equalTo(2)) &&
assert(list3.size)(equalTo(1)) &&
assert(list4.size)(equalTo(0))

list2 <- listTopicsFiltered(client, prefix)
_ <- client.deleteTopic(s"$prefix-topic2")
list3 <- listTopicsFiltered(client, prefix)
_ <- client.deleteTopic(s"$prefix-topic3")
list4 <- listTopicsFiltered(client, prefix)
} yield assertTrue(list1.isEmpty, list2.size == 2, list3.size == 1, list4.isEmpty)
}
},
test("just list") {
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0))
list1 <- listTopicsFiltered(client, "adminspec3")
} yield assertTrue(list1.isEmpty)

}
},
test("create, describe, delete multiple topic") {
val prefix = "adminspec4"
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
list1 <- listTopicsFiltered(client, prefix)
_ <- client.createTopics(
List(AdminClient.NewTopic("adminspec-topic4", 1, 1), AdminClient.NewTopic("adminspec-topic5", 4, 1))
List(AdminClient.NewTopic(s"$prefix-topic4", 1, 1), AdminClient.NewTopic(s"$prefix-topic5", 4, 1))
)
descriptions <- client.describeTopics(List("adminspec-topic4", "adminspec-topic5"))
_ <- client.deleteTopics(List("adminspec-topic4", "adminspec-topic5"))
list3 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0)) &&
assert(descriptions.size)(equalTo(2)) &&
assert(list3.size)(equalTo(0))

descriptions <- client.describeTopics(List(s"$prefix-topic4", s"$prefix-topic5"))
_ <- client.deleteTopics(List(s"$prefix-topic4", s"$prefix-topic5"))
list3 <- listTopicsFiltered(client, prefix)
} yield assertTrue(list1.isEmpty, descriptions.size == 2, list3.isEmpty)
}
},
test("create, describe topic config, delete multiple topic") {
val prefix = "adminspec5"
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
list1 <- listTopicsFiltered(client, prefix)
_ <- client.createTopics(
List(AdminClient.NewTopic("adminspec-topic6", 1, 1), AdminClient.NewTopic("adminspec-topic7", 4, 1))
List(AdminClient.NewTopic(s"$prefix-topic6", 1, 1), AdminClient.NewTopic(s"$prefix-topic7", 4, 1))
)
configResources = List(
ConfigResource(ConfigResourceType.Topic, "adminspec-topic6"),
ConfigResource(ConfigResourceType.Topic, "adminspec-topic7")
ConfigResource(ConfigResourceType.Topic, s"$prefix-topic6"),
ConfigResource(ConfigResourceType.Topic, s"$prefix-topic7")
)
configs <- client.describeConfigs(configResources) <&>
client.describeConfigsAsync(configResources).flatMap { configs =>
ZIO.foreachPar(configs) { case (resource, configTask) =>
configTask.map(config => (resource, config))
}
}
_ <- client.deleteTopics(List("adminspec-topic6", "adminspec-topic7"))
list3 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0)) &&
assert(configs._1.size)(equalTo(2)) &&
assert(configs._2.size)(equalTo(2)) &&
assert(list3.size)(equalTo(0))
_ <- client.deleteTopics(List(s"$prefix-topic6", s"$prefix-topic7"))
list3 <- listTopicsFiltered(client, prefix)
} yield assertTrue(list1.isEmpty, configs._1.size == 2, configs._2.size == 2, list3.isEmpty)
}
},
test("list cluster nodes") {
Expand Down Expand Up @@ -636,7 +630,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
assert(remainingAcls)(equalTo(Set.empty[AclBinding]))
}
}
).provideSomeShared[Scope](Kafka.embedded) @@ withLiveClock @@ sequential @@ timeout(2.minutes)
).provideSomeShared[Scope](Kafka.embedded) @@ withLiveClock @@ timeout(2.minutes)

private def consumeNoop(
topicName: String,
Expand Down
21 changes: 11 additions & 10 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import zio.test.TestAspect._
import zio.test._

import java.nio.charset.StandardCharsets
import java.util.UUID

object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "producerspec"

private def asString(v: Array[Byte]) = new String(v, StandardCharsets.UTF_8)

def withConsumerInt(
private def withConsumerInt(
subscription: Subscription,
settings: ConsumerSettings
): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, CommittableRecord[String, Int]]]] =
Expand Down Expand Up @@ -246,13 +247,13 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
}
}
} yield assertTrue(outcome.length == 3) &&
assertTrue(outcome(0).isRight) &&
assertTrue(
outcome(1).swap.exists(_.getMessage.contains("Compacted topic cannot accept message without key"))
) &&
assertTrue(outcome(2).isRight) &&
assertTrue(recordsConsumed.length == 2)
} yield assertTrue(
outcome.length == 3,
outcome(0).isRight,
outcome(1).swap.exists(_.getMessage.contains("Compacted topic cannot accept message without key")),
outcome(2).isRight,
recordsConsumed.length == 2
)
},
test("an empty chunk of records") {
val chunks = Chunk.fromIterable(List.empty)
Expand Down Expand Up @@ -646,11 +647,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
produceChunkSpec
)
.provideSome[Kafka](
(KafkaTestUtils.producer ++ transactionalProducer)
(KafkaTestUtils.producer ++ transactionalProducer(UUID.randomUUID().toString))
.mapError(TestFailure.fail),
KafkaTestUtils.consumer(clientId = "producer-spec-consumer", groupId = Some("group-0"))
)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ timeout(3.minutes) @@ sequential
) @@ withLiveClock @@ timeout(3.minutes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
},
test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
val topic = "test-outstanding-commits"
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
_ <- produceMany(topic, kvs)
Expand Down Expand Up @@ -354,7 +354,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
)
} yield assertTrue(offset.map(_.offset).contains(9L))
} @@ TestAspect.nonFlaky(5),
} @@ TestAspect.nonFlaky(2),
test("a consumer timeout interrupts the stream and shuts down the consumer") {
// Setup of this test:
// - Set the max poll interval very low: a couple of seconds.
Expand Down Expand Up @@ -1282,7 +1282,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
transactionalConsumer(
clientId,
consumerGroupId,
offsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest),
restartStreamOnRebalancing = true,
properties = Map(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG ->
Expand All @@ -1297,7 +1296,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}

for {
tProducerSettings <- transactionalProducerSettings
transactionalId <- randomThing("transactional")
tProducerSettings <- transactionalProducerSettings(transactionalId)
tProducer <- TransactionalProducer.make(tProducerSettings)

topicA <- randomTopic
Expand Down Expand Up @@ -1355,7 +1355,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
transactionalConsumer(
validatorClientId,
groupB,
offsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest),
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200")
)
)
Expand Down Expand Up @@ -1558,6 +1557,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ sequential @@ timeout(2.minutes)
) @@ withLiveClock @@ timeout(2.minutes)

}
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
.take(40)
.transduce(
Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink
.collectAll[CommittableRecord[String, String]]
Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&>
ZSink.collectAll[CommittableRecord[String, String]]
)
.mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) }
.flattenChunks
Expand All @@ -278,7 +278,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSomeLayer[Kafka with Scope](consumer(client, Some(group)))
consumed <- recordsConsumed.get
} yield assert(consumed.map(r => r.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2))))
} @@ TestAspect.nonFlaky(3)
} @@ TestAspect.nonFlaky(2)
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ object SslHelperSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
suite(".validateEndpoint")(
integrationTests,
unitTests
) @@ withLiveClock @@ sequential
) @@ withLiveClock

implicit class SettingsHelper(adminClientSettings: AdminClientSettings) {
def bootstrapServers: List[String] = adminClientSettings.driverSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,31 @@ object KafkaTestUtils {
)

/**
* Default transactional Producer settings you can use in your tests.
* Default transactional producer settings you can use in your tests.
*
* Note: to run multiple tests in parallel, you need to use different transactional ids via
* `transactionalProducerSettings(transactionalId)`.
*/
val transactionalProducerSettings: ZIO[Kafka, Nothing, TransactionalProducerSettings] =
transactionalProducerSettings("test-transaction")

def transactionalProducerSettings(transactionalId: String): ZIO[Kafka, Nothing, TransactionalProducerSettings] =
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
ZIO
.serviceWith[Kafka](_.bootstrapServers)
.map(TransactionalProducerSettings(_, "test-transaction"))
.map(TransactionalProducerSettings(_, transactionalId))

/**
* Transactional Producer instance you can use in your tests. It uses the default transactional Producer settings.
* Transactional producer instance you can use in your tests. It uses the default transactional producer settings.
*
* Note: to run multiple tests in parallel, you need to use different transactional ids via
* `transactionalProducer(transactionalId)`.
*/
val transactionalProducer: ZLayer[Kafka, Throwable, TransactionalProducer] =
transactionalProducer("test-transaction")

def transactionalProducer(transactionalId: String): ZLayer[Kafka, Throwable, TransactionalProducer] =
ZLayer.makeSome[Kafka, TransactionalProducer](
ZLayer(transactionalProducerSettings),
ZLayer(transactionalProducerSettings(transactionalId)),
TransactionalProducer.live
)

Expand Down Expand Up @@ -234,7 +246,7 @@ object KafkaTestUtils {
clientId: String,
groupId: String,
clientInstanceId: Option[String] = None,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest),
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
Expand Down
Loading
Loading