Skip to content

Commit

Permalink
Use effectful logger create (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
bcarter97 authored Oct 2, 2024
1 parent 159a187 commit 47572c0
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ trait TopicLoader {
topics: NonEmptyList[String],
strategy: LoadTopicStrategy,
consumerSettings: ConsumerSettings[F, K, V]
): Stream[F, ConsumerRecord[K, V]] = {
given Logger[F] = LoggerFactory[F].getLogger
KafkaConsumer
.stream(consumerSettings)
.flatMap(load(topics, strategy, _))
}
): Stream[F, ConsumerRecord[K, V]] =
for {
given Logger[F] <- Stream.eval(LoggerFactory[F].create)
consumer <- KafkaConsumer.stream(consumerSettings)
record <- load(topics, strategy, consumer)
} yield record

/** Stream that loads the specified topics from the beginning. When the latest current offsets are reached, the
* `onLoad` callback is evaluated, and the stream continues.
Expand All @@ -72,9 +72,9 @@ trait TopicLoader {
topics: NonEmptyList[String],
consumerSettings: ConsumerSettings[F, K, V]
)(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] = {
given Logger[F] = LoggerFactory[F].getLogger

def postLoad(logOffsets: NonEmptyMap[TopicPartition, LogOffsets]): Stream[F, ConsumerRecord[K, V]] =
def postLoad(
logOffsets: NonEmptyMap[TopicPartition, LogOffsets]
)(using Logger[F]): Stream[F, ConsumerRecord[K, V]] =
for {
// The only consistent workaround for re-assigning offsets after the initial load is to re-create the consumer
postLoadConsumer <- KafkaConsumer.stream(consumerSettings)
Expand All @@ -83,6 +83,7 @@ trait TopicLoader {
} yield record

for {
given Logger[F] <- Stream.eval(LoggerFactory[F].create)
preloadConsumer <- KafkaConsumer.stream(consumerSettings)
logOffsets <- Stream.eval(logOffsetsForTopics(topics, LoadAll, preloadConsumer)).flatMap(Stream.fromOption(_))
_ <- Stream.eval(info"log offsets: ${logOffsets.show}")
Expand Down

0 comments on commit 47572c0

Please sign in to comment.