diff --git a/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala b/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala index 90b9f5f..fd9ffb6 100644 --- a/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala +++ b/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala @@ -82,12 +82,17 @@ trait TopicLoader { record <- postLoadConsumer.records.map(_.record) } yield record + def stopConsumer(consumer: KafkaConsumer[F, K, V]) = + consumer.stopConsuming >> consumer.terminate >> consumer.awaitTermination + for { given Logger[F] <- Stream.eval(LoggerFactory[F].create) preloadConsumer <- KafkaConsumer.stream(consumerSettings) logOffsets <- Stream.eval(logOffsetsForTopics(topics, LoadAll, preloadConsumer)).flatMap(Stream.fromOption(_)) _ <- Stream.eval(info"log offsets: ${logOffsets.show}") - record <- load(logOffsets, preloadConsumer).onFinalizeCase(onLoad) ++ postLoad(logOffsets) + record <- load(logOffsets, preloadConsumer).onFinalizeCase(onLoad) ++ + Stream.exec(stopConsumer(preloadConsumer)) ++ + postLoad(logOffsets) } yield record }