Skip to content

Commit

Permalink
Fix flaky test (#1082)
Browse files Browse the repository at this point in the history
Also: switch to nano clock to avoid the time jumps that come with the regular system clock.
  • Loading branch information
erikvanoosten authored Oct 15, 2023
1 parent e179000 commit f4a95c2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,17 +333,19 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
settings <- consumerSettings(
clientId = clientId,
groupId = Some(group),
maxPollInterval = 1.second
maxPollInterval = 1.second,
`max.poll.records` = 2
)
consumer <- Consumer.make(settings.withPollTimeout(500.millis))
_ <- scheduledProduce(topic1, Schedule.fixed(100.millis).jittered).runDrain.forkScoped
_ <- scheduledProduce(topic2, Schedule.fixed(100.millis).jittered).runDrain.forkScoped
consumer <- Consumer.make(settings.withPollTimeout(100.millis))
_ <- scheduledProduce(topic1, Schedule.fixed(50.millis).jittered).runDrain.forkScoped
_ <- scheduledProduce(topic2, Schedule.fixed(200.millis).jittered).runDrain.forkScoped
// The slow consumer:
c1 <- consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
.rechunk(5) // Time out detection is at the chunk level. We need at least 2 chunks.
.tap(r => ZIO.sleep(5.seconds).when(r.key == "key3"))
.take(100) // Because of chunking, we need to pull a bit more before the interrupt kicks in.
// Use `take` to ensure the test ends quickly, even when the interrupt fails to occur.
// Because of chunking, we need to pull more than 3 records before the interrupt kicks in.
.take(100)
.runDrain
.exit
.fork
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.stream.{ Take, ZStream }
import zio.{ Chunk, Clock, Duration, LogAnnotation, Promise, Queue, Ref, UIO, ZIO }

import java.time.Instant
import java.util.concurrent.TimeoutException
import scala.math.Ordered.orderingToOrdered
import scala.util.control.NoStackTrace

final class PartitionStreamControl private (
Expand All @@ -21,6 +19,8 @@ final class PartitionStreamControl private (
queueInfoRef: Ref[QueueInfo],
maxPollInterval: Duration
) {
private val maxPollIntervalNanos = maxPollInterval.toNanos

private val logAnnotate = ZIO.logAnnotate(
LogAnnotation("topic", tp.topic()),
LogAnnotation("partition", tp.partition().toString)
Expand All @@ -29,8 +29,8 @@ final class PartitionStreamControl private (
/** Offer new data for the stream to process. */
private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] =
for {
now <- Clock.instant
newPullDeadline = now.plus(maxPollInterval)
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size))
_ <- dataQueue.offer(Take.chunk(data))
} yield ()
Expand All @@ -44,7 +44,7 @@ final class PartitionStreamControl private (
*/
private[internal] def maxPollIntervalExceeded: UIO[Boolean] =
for {
now <- Clock.instant
now <- Clock.nanoTime
queueInfo <- queueInfoRef.get
} yield queueInfo.deadlineExceeded(now)

Expand Down Expand Up @@ -82,16 +82,20 @@ final class PartitionStreamControl private (

object PartitionStreamControl {

private type NanoTime = Long

private[internal] def newPartitionStream(
tp: TopicPartition,
commandQueue: Queue[RunloopCommand],
diagnostics: Diagnostics,
maxPollInterval: Duration
): UIO[PartitionStreamControl] = {
val maxPollIntervalNanos = maxPollInterval.toNanos

def registerPull(queueInfo: Ref[QueueInfo], recordCount: Int): UIO[Unit] =
for {
now <- Clock.instant
newPullDeadline = now.plus(maxPollInterval)
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfo.update(_.withPull(newPullDeadline, recordCount))
} yield ()

Expand All @@ -100,7 +104,7 @@ object PartitionStreamControl {
interruptionPromise <- Promise.make[Throwable, Unit]
completedPromise <- Promise.make[Nothing, Unit]
dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]]
now <- Clock.instant
now <- Clock.nanoTime
queueInfo <- Ref.make(QueueInfo(now, 0))
requestAndAwaitData =
for {
Expand Down Expand Up @@ -137,14 +141,14 @@ object PartitionStreamControl {

// The `pullDeadline` is only relevant when `size > 0`. We initialize `pullDeadline` as soon as size goes above 0.
// (Note that theoretically `size` can go below 0 when the update operations are reordered.)
private final case class QueueInfo(pullDeadline: Instant, size: Int) {
def withOffer(newPullDeadline: Instant, recordCount: Int): QueueInfo =
private final case class QueueInfo(pullDeadline: NanoTime, size: Int) {
def withOffer(newPullDeadline: NanoTime, recordCount: Int): QueueInfo =
QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount)

def withPull(newPullDeadline: Instant, recordCount: Int): QueueInfo =
def withPull(newPullDeadline: NanoTime, recordCount: Int): QueueInfo =
QueueInfo(newPullDeadline, size - recordCount)

def deadlineExceeded(now: Instant): Boolean =
def deadlineExceeded(now: NanoTime): Boolean =
size > 0 && pullDeadline <= now
}

Expand Down

0 comments on commit f4a95c2

Please sign in to comment.