Skip to content

Commit

Permalink
Delete RunloopTimeout and small improvements (#1141)
Browse files Browse the repository at this point in the history
* Delete `RunloopTimeout` as it is no longer used.
* Do not suppress error in `commitTransactionWithOffsets` (even though the only invoker still dies upon any error).
* Better debug logging in `Runloop`.
* Shave tens of seconds of test by never taking more messages than can be sent.
* Run test less often, 3 times is enough (`nonFlaky(3)`` runs the test 4 times!).
  • Loading branch information
erikvanoosten authored Dec 22, 2023
1 parent b741f60 commit d2aa7f7
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
* - A producer generates some messages on topic A,
* - a transactional consumer/producer pair (copier1) reads these and copies them to topic B transactionally,
* - after a few messages we start a second transactional consumer/producer pair (copier2) that does the same
* (in the same consumer group) this triggers a rebalance,
* (in the same consumer group), this triggers a rebalance,
* - produce some more messages to topic A,
* - a consumer that empties topic B,
* - when enough messages have been received, the copiers are interrupted.
Expand Down Expand Up @@ -1347,8 +1347,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messagesOnTopicB <- ZIO.logAnnotate("consumer", "validator") {
Consumer
.plainStream(Subscription.topics(topicB), Serde.string, Serde.string)
.map(_.value)
.timeout(5.seconds)
.mapChunks(_.map(_.value))
.take(messageCount.toLong)
.timeout(10.seconds)
.runCollect
.provideSome[Kafka](
transactionalConsumer(
Expand All @@ -1373,7 +1374,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
// testForPartitionAssignmentStrategy[CooperativeStickyAssignor] // TODO not yet supported
)

}: _*) @@ TestAspect.nonFlaky(3),
}: _*) @@ TestAspect.nonFlaky(2),
test("running streams don't stall after a poll timeout") {
for {
topic <- randomTopic
Expand Down
3 changes: 1 addition & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ trait Consumer {
}

object Consumer {
case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace
case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace
case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace

val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] =
ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _)
Expand Down
14 changes: 10 additions & 4 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,12 @@ private[consumer] final class Runloop private (
val recordRebalanceRebalancingListener = RebalanceListener(
onAssigned = (assignedTps, _) =>
for {
_ <- ZIO.logDebug(s"${assignedTps.size} partitions are assigned")
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
_ <- ZIO.logInfo {
val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else ""
s"${assignedTps.size} partitions are assigned$sameRebalance"
}
state <- currentStateRef.get
streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams
else Chunk.empty
_ <- endStreams(state, streamsToEnd)
Expand All @@ -213,9 +216,12 @@ private[consumer] final class Runloop private (
} yield (),
onRevoked = (revokedTps, _) =>
for {
_ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked")
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
_ <- ZIO.logInfo {
val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else ""
s"${revokedTps.size} partitions are revoked$sameRebalance"
}
state <- currentStateRef.get
streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams
else state.assignedStreams.filter(control => revokedTps.contains(control.tp))
_ <- endStreams(state, streamsToEnd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object TransactionalProducer {

private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = {
val sendOffsetsToTransaction: Task[Unit] =
ZIO.suspendSucceed {
ZIO.suspend {
@inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] =
ZIO.fail(
new InvalidGroupIdException(
Expand Down

0 comments on commit d2aa7f7

Please sign in to comment.