Skip to content

Commit

Permalink
Remove deprecated methods (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
bcarter97 authored May 19, 2022
1 parent 7c470d3 commit cd952fd
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 214 deletions.
63 changes: 2 additions & 61 deletions src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package uk.sky.kafka.topicloader
import java.lang.{Long => JLong}
import java.util.{List => JList, Map => JMap, Optional}

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Keep, Source}
import akka.{Done, NotUsed}
import cats.data.NonEmptyList
import cats.syntax.bifunctor._
import cats.syntax.option._
Expand All @@ -23,7 +23,7 @@ import uk.sky.kafka.topicloader.config.{Config, TopicLoaderConfig}
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

object TopicLoader extends TopicLoader with DeprecatedMethods {
object TopicLoader extends TopicLoader {
private[topicloader] case class LogOffsets(lowest: Long, highest: Long)

private case class HighestOffsetsWithRecord[K, V](
Expand Down Expand Up @@ -237,62 +237,3 @@ trait TopicLoader extends LazyLogging {
HighestOffsetsWithRecord(updatedHighests, emittableRecord)
}
}

trait DeprecatedMethods { self: TopicLoader =>

import TopicLoader._

@deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.2.8")
def apply[T](
strategy: LoadTopicStrategy,
topics: NonEmptyList[String],
onRecord: ConsumerRecord[String, T] => Future[_],
valueDeserializer: Deserializer[T]
)(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] =
fromTopics(strategy, topics, onRecord, valueDeserializer)

/** Consumes the records from the provided topics, passing them through `onRecord`.
*/
@deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0")
def fromTopics[T](
strategy: LoadTopicStrategy,
topics: NonEmptyList[String],
onRecord: ConsumerRecord[String, T] => Future[_],
valueDeserializer: Deserializer[T]
)(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = {
val logOffsets = logOffsetsForTopics(topics, strategy)
deprecatedLoad(logOffsets, onRecord, valueDeserializer)
}

private val keySerializer = new StringDeserializer

/** Consumes the records from the provided partitions, passing them through `onRecord`.
*/
@deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0")
def fromPartitions[T](
strategy: LoadTopicStrategy,
partitions: NonEmptyList[TopicPartition],
onRecord: ConsumerRecord[String, T] => Future[_],
valueDeserializer: Deserializer[T]
)(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = {
val logOffsets = logOffsetsForPartitions(partitions, strategy)
deprecatedLoad(logOffsets, onRecord, valueDeserializer)
}

private def deprecatedLoad[T](
logOffsets: Future[Map[TopicPartition, LogOffsets]],
onRecord: ConsumerRecord[String, T] => Future[_],
valueDeserializer: Deserializer[T]
)(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = {
val config = Config.loadOrThrow(system.settings.config).topicLoader

import system.dispatcher

load(logOffsets, config, None)(keySerializer, valueDeserializer, system)
.mapMaterializedValue(_ => NotUsed)
.mapAsync(config.parallelism.value)(r => onRecord(r).map(_ => r))
.fold(logOffsets) { case (acc, _) => acc }
.flatMapConcat(Source.future)
.map(_.map { case (p, o) => p -> o.highest })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,9 @@ object PosInt {

final case class Config(topicLoader: TopicLoaderConfig)

/** @param parallelism
* Determines how many Kafka records are processed in parallel by [[uk.sky.kafka.topicloader.TopicLoader]]. We
* recommend using a parallelism > 1 if you are processing the records by sending them to an akka.actor.Actor. This
* is so that messages are buffered in the akka.actor.Actor's mailbox, improving performance versus using a
* parallelism of 1.
*/
final case class TopicLoaderConfig(
idleTimeout: FiniteDuration,
bufferSize: PosInt,
// @deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0")
parallelism: PosInt = PosInt.One
bufferSize: PosInt
)

object Config {
Expand All @@ -47,10 +39,7 @@ object Config {
val bufferSize = PosInt(config.getInt(s"$basePath.buffer-size"))
.validate(s"$basePath.buffer-size")

val parallelism = PosInt(config.getInt(s"$basePath.parallelism"))
.validate(s"$basePath.parallelism")

(idleTimeout, bufferSize, parallelism).mapN(TopicLoaderConfig.apply).map(Config.apply)
(idleTimeout, bufferSize).mapN(TopicLoaderConfig.apply).map(Config.apply)
}

def loadOrThrow(config: TypesafeConfig): Config = load(config) match {
Expand Down
1 change: 0 additions & 1 deletion src/test/scala/base/IntegrationSpecBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ abstract class IntegrationSpecBase extends WordSpecBase with Eventually {
|topic-loader {
| idle-timeout = 5 minutes
| buffer-size = 1000
| parallelism = 5
|}
|akka {
| loglevel = "OFF"
Expand Down
139 changes: 0 additions & 139 deletions src/test/scala/integration/DeprecatedMethodsIntSpec.scala

This file was deleted.

0 comments on commit cd952fd

Please sign in to comment.