Skip to content

Commit

Permalink
Destroy the preload consumer when complete in loadAndRun
Browse files Browse the repository at this point in the history
  • Loading branch information
bcarter97 committed Dec 12, 2024
1 parent 47572c0 commit 5de2801
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,17 @@ trait TopicLoader {
record <- postLoadConsumer.records.map(_.record)
} yield record

def stopConsumer(consumer: KafkaConsumer[F, K, V]) =
consumer.stopConsuming >> consumer.terminate >> consumer.awaitTermination

for {
given Logger[F] <- Stream.eval(LoggerFactory[F].create)
preloadConsumer <- KafkaConsumer.stream(consumerSettings)
logOffsets <- Stream.eval(logOffsetsForTopics(topics, LoadAll, preloadConsumer)).flatMap(Stream.fromOption(_))
_ <- Stream.eval(info"log offsets: ${logOffsets.show}")
record <- load(logOffsets, preloadConsumer).onFinalizeCase(onLoad) ++ postLoad(logOffsets)
record <- load(logOffsets, preloadConsumer).onFinalizeCase(onLoad) ++
Stream.exec(stopConsumer(preloadConsumer)) ++
postLoad(logOffsets)
} yield record
}

Expand Down

0 comments on commit 5de2801

Please sign in to comment.