Skip to content

Commit

Permalink
Fix test case and reduce test duration
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Feb 26, 2023
1 parent 33ea3f2 commit 74e079e
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object SubscriptionsSpec extends ZIOKafkaSpec {
} yield assertCompletes
},
test("can resume a stream for the same subscription") {
val kvs = (1 to 10000).toList.map(i => (s"key$i", s"msg$i"))
val kvs = (1 to 1000).toList.map(i => (s"key$i", s"msg$i"))
for {
topic1 <- randomTopic
client <- randomClient
Expand All @@ -186,7 +186,7 @@ object SubscriptionsSpec extends ZIOKafkaSpec {
_ <-
Consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
.take(200)
.take(40)
.transduce(
Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink
.collectAll[CommittableRecord[String, String]]
Expand All @@ -195,11 +195,11 @@ object SubscriptionsSpec extends ZIOKafkaSpec {
.flattenChunks
.runCollect
.tap(records => recordsConsumed.update(_ ++ records))
.repeatN(50)
.repeatN(24)
.provideSomeLayer[Kafka with Scope](consumer(client, Some(group)))
consumed <- recordsConsumed.get
} yield assert(consumed.map(r => r.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2))))
} @@ TestAspect.nonFlaky(5)
} @@ TestAspect.nonFlaky(3)
).provideSomeLayerShared[TestEnvironment & Kafka](
producer ++ Scope.default ++ Runtime.removeDefaultLoggers ++ Runtime.addLogger(logger)
) @@ withLiveClock @@ TestAspect.sequential @@ timeout(600.seconds)
Expand Down

0 comments on commit 74e079e

Please sign in to comment.