From 16454777caf30d5f43489eb6add249b68d245ea9 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Tue, 29 Oct 2019 15:14:01 -0400 Subject: [PATCH 1/4] implement graceful shutdown --- .../scala/zio/kafka/client/Consumer.scala | 3 ++ src/main/scala/zio/kafka/client/Runloop.scala | 33 +++++++++++++++---- .../scala/zio/kafka/client/ConsumerTest.scala | 19 +++++++++++ 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/src/main/scala/zio/kafka/client/Consumer.scala b/src/main/scala/zio/kafka/client/Consumer.scala index f4591260e..cf24e22e5 100644 --- a/src/main/scala/zio/kafka/client/Consumer.scala +++ b/src/main/scala/zio/kafka/client/Consumer.scala @@ -32,6 +32,9 @@ class Consumer private ( ): BlockingTask[Map[TopicPartition, Long]] = consumer.withConsumer(_.endOffsets(partitions.asJava, timeout.asJava).asScala.mapValues(_.longValue()).toMap) + def gracefulShutdown: UIO[Unit] = + runloop.deps.gracefulShutdown + def listTopics(timeout: Duration = Duration.Infinity): BlockingTask[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.mapValues(_.asScala.toList).toMap) diff --git a/src/main/scala/zio/kafka/client/Runloop.scala b/src/main/scala/zio/kafka/client/Runloop.scala index 66cde2cea..6cccc8ba9 100644 --- a/src/main/scala/zio/kafka/client/Runloop.scala +++ b/src/main/scala/zio/kafka/client/Runloop.scala @@ -46,7 +46,8 @@ object Runloop { partitions: Queue[Take[Throwable, (TopicPartition, ZStreamChunk[Any, Throwable, ByteArrayCommittableRecord])]], rebalancingRef: Ref[Boolean], rebalanceListener: ConsumerRebalanceListener, - diagnostics: Diagnostics + diagnostics: Diagnostics, + shutdownRef: Ref[Boolean] ) { def commit(cmd: Command.Commit) = commitQueue.offer(cmd).unit def commits = ZStream.fromQueue(commitQueue) @@ -60,6 +61,14 @@ object Runloop { partitions.offer(Take.Value(tp -> data)).unit def emitIfEnabledDiagnostic(event: DiagnosticEvent) = diagnostics.emitIfEnabled(event) + + val isShutdown = shutdownRef.get + + def gracefulShutdown: UIO[Unit] = + for { + shutdown <- shutdownRef.modify((_, true)) + _ <- partitions.offer(Take.End).when(!shutdown) + } yield () } object Deps { def make( @@ -100,6 +109,7 @@ object Runloop { } } .toManaged_ + shutdownRef <- Ref.make(false).toManaged_ } yield Deps( consumer, pollFrequency, @@ -109,7 +119,8 @@ object Runloop { partitions, rebalancingRef, listener, - diagnostics + diagnostics, + shutdownRef ) } @@ -384,6 +395,12 @@ object Runloop { else doCommit(List(cmd)).as(state) } yield newState + def handleShutdown(state: State, cmd: Command): BlockingTask[State] = cmd match { + case Command.Poll() => UIO.succeed(state) + case req @ Command.Request(_, _) => req.cont.fail(None).as(state) + case cmd @ Command.Commit(_, _) => handleCommit(state, cmd) + } + ZStream .mergeAll(3, 32)( deps.polls, @@ -391,10 +408,14 @@ object Runloop { deps.commits ) .foldM(State.initial) { (state, cmd) => - cmd match { - case Command.Poll() => handlePoll(state) - case req @ Command.Request(_, _) => handleRequest(state, req) - case cmd @ Command.Commit(_, _) => handleCommit(state, cmd) + deps.isShutdown.flatMap { shutdown => + if (shutdown) handleShutdown(state, cmd) + else + cmd match { + case Command.Poll() => handlePoll(state) + case req @ Command.Request(_, _) => handleRequest(state, req) + case cmd @ Command.Commit(_, _) => handleCommit(state, cmd) + } } } .onError { cause => diff --git a/src/test/scala/zio/kafka/client/ConsumerTest.scala b/src/test/scala/zio/kafka/client/ConsumerTest.scala index 35ac67f2b..9f960382f 100644 --- a/src/test/scala/zio/kafka/client/ConsumerTest.scala +++ b/src/test/scala/zio/kafka/client/ConsumerTest.scala @@ -232,5 +232,24 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa } yield consumeResult.fold(_ => succeed, _ => fail("Expected consumeWith to fail")) } } + + "shutting down gracefully" should { + "not receive messages after shutting down" in runWithConsumer("group150", "client150") { consumer => + for { + kvs <- ZIO((1 to 5).toList.map(i => (s"key$i", s"msg$i"))) + _ <- produceMany("topic150", kvs) + _ <- consumer.gracefulShutdown + records <- consumer + .subscribeAnd(Subscription.Topics(Set("topic150"))) + .plainStream(Serde.string, Serde.string) + .flattenChunks + .take(5) + .runCollect + _ <- ZIO.effectTotal(records.map { r => + (r.record.key, r.record.value) + } shouldBe empty) + } yield () + } + } } } From 0db5046af3d5cf4bab7f2858e1b86493312a6c3e Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Thu, 31 Oct 2019 15:32:06 -0400 Subject: [PATCH 2/4] address review comments --- .../scala/zio/kafka/client/Consumer.scala | 6 +- src/main/scala/zio/kafka/client/Runloop.scala | 85 ++++++++++++------- .../scala/zio/kafka/client/ConsumerTest.scala | 2 +- 3 files changed, 58 insertions(+), 35 deletions(-) diff --git a/src/main/scala/zio/kafka/client/Consumer.scala b/src/main/scala/zio/kafka/client/Consumer.scala index cf24e22e5..4b0b3e6de 100644 --- a/src/main/scala/zio/kafka/client/Consumer.scala +++ b/src/main/scala/zio/kafka/client/Consumer.scala @@ -32,7 +32,11 @@ class Consumer private ( ): BlockingTask[Map[TopicPartition, Long]] = consumer.withConsumer(_.endOffsets(partitions.asJava, timeout.asJava).asScala.mapValues(_.longValue()).toMap) - def gracefulShutdown: UIO[Unit] = + /** + * Stops consumption of data, drains buffered records, and ends the attached + * streams while still serving commit requests. + */ + def stopConsumption: UIO[Unit] = runloop.deps.gracefulShutdown def listTopics(timeout: Duration = Duration.Infinity): BlockingTask[Map[String, List[PartitionInfo]]] = diff --git a/src/main/scala/zio/kafka/client/Runloop.scala b/src/main/scala/zio/kafka/client/Runloop.scala index 6cccc8ba9..801314794 100644 --- a/src/main/scala/zio/kafka/client/Runloop.scala +++ b/src/main/scala/zio/kafka/client/Runloop.scala @@ -85,6 +85,12 @@ object Runloop { .unbounded[ Take[Throwable, (TopicPartition, ZStreamChunk[Any, Throwable, ByteArrayCommittableRecord])] ] + .map { queue => + queue.mapM { + case Take.End => queue.shutdown.as(Take.End) + case x => ZIO.succeed(x) + } + } .toManaged(_.shutdown) listener <- ZIO .runtime[Blocking] @@ -331,38 +337,44 @@ object Runloop { // is empty because pattern subscriptions start out as empty. case _: IllegalStateException => null } + deps.isShutdown.flatMap { shutdown => + if (shutdown) + ZIO.effectTotal(deps.consumer.consumer.pause(requestedPartitions.asJava)) *> + ZIO.succeed( + (Set(), (state.pendingRequests, Map[TopicPartition, Chunk[ByteArrayConsumerRecord]]())) + ) + else if (records eq null) + ZIO.succeed( + (Set(), (state.pendingRequests, Map[TopicPartition, Chunk[ByteArrayConsumerRecord]]())) + ) + else { - if (records eq null) - ZIO.succeed( - (Set(), (state.pendingRequests, Map[TopicPartition, Chunk[ByteArrayConsumerRecord]]())) - ) - else { - - val tpsInResponse = records.partitions.asScala - val currentAssigned = c.assignment().asScala - val newlyAssigned = currentAssigned -- prevAssigned - val revoked = prevAssigned -- currentAssigned - val unrequestedRecords = - bufferUnrequestedPartitions(records, tpsInResponse -- requestedPartitions) - - endRevoked( - state.pendingRequests, - state.addBufferedRecords(unrequestedRecords).bufferedRecords, - revoked(_) - ).flatMap { - case (pendingRequests, bufferedRecords) => - for { - output <- fulfillRequests(pendingRequests, bufferedRecords, records) - (notFulfilled, fulfilled) = output - _ <- deps.emitIfEnabledDiagnostic( - DiagnosticEvent.Poll( - requestedPartitions, - fulfilled.keySet, - notFulfilled.map(_.tp).toSet + val tpsInResponse = records.partitions.asScala + val currentAssigned = c.assignment().asScala + val newlyAssigned = currentAssigned -- prevAssigned + val revoked = prevAssigned -- currentAssigned + val unrequestedRecords = + bufferUnrequestedPartitions(records, tpsInResponse -- requestedPartitions) + + endRevoked( + state.pendingRequests, + state.addBufferedRecords(unrequestedRecords).bufferedRecords, + revoked(_) + ).flatMap { + case (pendingRequests, bufferedRecords) => + for { + output <- fulfillRequests(pendingRequests, bufferedRecords, records) + (notFulfilled, fulfilled) = output + _ <- deps.emitIfEnabledDiagnostic( + DiagnosticEvent.Poll( + requestedPartitions, + fulfilled.keySet, + notFulfilled.map(_.tp).toSet + ) ) - ) - } yield output - }.map((newlyAssigned.toSet, _)) + } yield output + }.map((newlyAssigned.toSet, _)) + } } } } @@ -396,9 +408,16 @@ object Runloop { } yield newState def handleShutdown(state: State, cmd: Command): BlockingTask[State] = cmd match { - case Command.Poll() => UIO.succeed(state) - case req @ Command.Request(_, _) => req.cont.fail(None).as(state) - case cmd @ Command.Commit(_, _) => handleCommit(state, cmd) + case Command.Poll() => UIO.succeed(state) + case Command.Request(tp, cont) => + state.bufferedRecords.get(tp) match { + case Some(recs) => + cont + .succeed(recs.map(CommittableRecord(_, commit(_)))) + .as(state.removeBufferedRecordsFor(tp)) + case None => cont.fail(None).as(state) + } + case cmd @ Command.Commit(_, _) => handleCommit(state, cmd) } ZStream diff --git a/src/test/scala/zio/kafka/client/ConsumerTest.scala b/src/test/scala/zio/kafka/client/ConsumerTest.scala index 9f960382f..a713adbaf 100644 --- a/src/test/scala/zio/kafka/client/ConsumerTest.scala +++ b/src/test/scala/zio/kafka/client/ConsumerTest.scala @@ -238,7 +238,7 @@ class ConsumerTest extends WordSpecLike with Matchers with LazyLogging with Defa for { kvs <- ZIO((1 to 5).toList.map(i => (s"key$i", s"msg$i"))) _ <- produceMany("topic150", kvs) - _ <- consumer.gracefulShutdown + _ <- consumer.stopConsumption records <- consumer .subscribeAnd(Subscription.Topics(Set("topic150"))) .plainStream(Serde.string, Serde.string) From 9d655f283947ba94b5d96b4d0faacf42d25ef6eb Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Fri, 1 Nov 2019 17:01:02 +0100 Subject: [PATCH 3/4] call handlePoll --- src/main/scala/zio/kafka/client/Runloop.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/zio/kafka/client/Runloop.scala b/src/main/scala/zio/kafka/client/Runloop.scala index 801314794..0fd51aa6f 100644 --- a/src/main/scala/zio/kafka/client/Runloop.scala +++ b/src/main/scala/zio/kafka/client/Runloop.scala @@ -408,7 +408,7 @@ object Runloop { } yield newState def handleShutdown(state: State, cmd: Command): BlockingTask[State] = cmd match { - case Command.Poll() => UIO.succeed(state) + case Command.Poll() => handlePoll(state) case Command.Request(tp, cont) => state.bufferedRecords.get(tp) match { case Some(recs) => From 1113c9636f8114b9070c23a1e2672082ee9aba9f Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Mon, 18 Nov 2019 11:03:21 -0500 Subject: [PATCH 4/4] eagerly resolve pending requests --- src/main/scala/zio/kafka/client/Runloop.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/scala/zio/kafka/client/Runloop.scala b/src/main/scala/zio/kafka/client/Runloop.scala index b682147ba..0e1ddfb4b 100644 --- a/src/main/scala/zio/kafka/client/Runloop.scala +++ b/src/main/scala/zio/kafka/client/Runloop.scala @@ -408,7 +408,14 @@ object Runloop { } yield newState def handleShutdown(state: State, cmd: Command): BlockingTask[State] = cmd match { - case Command.Poll() => handlePoll(state) + case Command.Poll() => + state.pendingRequests match { + case h :: t => + handleShutdown(state, h).flatMap { s => + handleShutdown(s.copy(pendingRequests = t), cmd) + } + case Nil => handlePoll(state) + } case Command.Request(tp, cont) => state.bufferedRecords.get(tp) match { case Some(recs) =>