diff --git a/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala b/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala index 255c85b..90b9f5f 100644 --- a/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala +++ b/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala @@ -51,12 +51,12 @@ trait TopicLoader { topics: NonEmptyList[String], strategy: LoadTopicStrategy, consumerSettings: ConsumerSettings[F, K, V] - ): Stream[F, ConsumerRecord[K, V]] = { - given Logger[F] = LoggerFactory[F].getLogger - KafkaConsumer - .stream(consumerSettings) - .flatMap(load(topics, strategy, _)) - } + ): Stream[F, ConsumerRecord[K, V]] = + for { + given Logger[F] <- Stream.eval(LoggerFactory[F].create) + consumer <- KafkaConsumer.stream(consumerSettings) + record <- load(topics, strategy, consumer) + } yield record /** Stream that loads the specified topics from the beginning. When the latest current offsets are reached, the * `onLoad` callback is evaluated, and the stream continues. @@ -72,9 +72,9 @@ trait TopicLoader { topics: NonEmptyList[String], consumerSettings: ConsumerSettings[F, K, V] )(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] = { - given Logger[F] = LoggerFactory[F].getLogger - - def postLoad(logOffsets: NonEmptyMap[TopicPartition, LogOffsets]): Stream[F, ConsumerRecord[K, V]] = + def postLoad( + logOffsets: NonEmptyMap[TopicPartition, LogOffsets] + )(using Logger[F]): Stream[F, ConsumerRecord[K, V]] = for { // The only consistent workaround for re-assigning offsets after the initial load is to re-create the consumer postLoadConsumer <- KafkaConsumer.stream(consumerSettings) @@ -83,6 +83,7 @@ trait TopicLoader { } yield record for { + given Logger[F] <- Stream.eval(LoggerFactory[F].create) preloadConsumer <- KafkaConsumer.stream(consumerSettings) logOffsets <- Stream.eval(logOffsetsForTopics(topics, LoadAll, preloadConsumer)).flatMap(Stream.fromOption(_)) _ <- Stream.eval(info"log offsets: ${logOffsets.show}")