From cd952fda65baed5b401dad1ecaa9bc77c1070e8f Mon Sep 17 00:00:00 2001 From: Ben Carter Date: Thu, 19 May 2022 19:40:43 +0100 Subject: [PATCH] Remove deprecated methods (#113) --- .../sky/kafka/topicloader/TopicLoader.scala | 63 +------- .../config/topicLoaderConfig.scala | 15 +- src/test/scala/base/IntegrationSpecBase.scala | 1 - .../DeprecatedMethodsIntSpec.scala | 139 ------------------ 4 files changed, 4 insertions(+), 214 deletions(-) delete mode 100644 src/test/scala/integration/DeprecatedMethodsIntSpec.scala diff --git a/src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala b/src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala index 8d34f381..84bf6f40 100644 --- a/src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala +++ b/src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala @@ -3,12 +3,12 @@ package uk.sky.kafka.topicloader import java.lang.{Long => JLong} import java.util.{List => JList, Map => JMap, Optional} +import akka.Done import akka.actor.ActorSystem import akka.kafka.scaladsl.Consumer import akka.kafka.{ConsumerSettings, Subscriptions} import akka.stream.OverflowStrategy import akka.stream.scaladsl.{Flow, Keep, Source} -import akka.{Done, NotUsed} import cats.data.NonEmptyList import cats.syntax.bifunctor._ import cats.syntax.option._ @@ -23,7 +23,7 @@ import uk.sky.kafka.topicloader.config.{Config, TopicLoaderConfig} import scala.concurrent.Future import scala.jdk.CollectionConverters._ -object TopicLoader extends TopicLoader with DeprecatedMethods { +object TopicLoader extends TopicLoader { private[topicloader] case class LogOffsets(lowest: Long, highest: Long) private case class HighestOffsetsWithRecord[K, V]( @@ -237,62 +237,3 @@ trait TopicLoader extends LazyLogging { HighestOffsetsWithRecord(updatedHighests, emittableRecord) } } - -trait DeprecatedMethods { self: TopicLoader => - - import TopicLoader._ - - @deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.2.8") - def apply[T]( - strategy: LoadTopicStrategy, - topics: NonEmptyList[String], - onRecord: ConsumerRecord[String, T] => Future[_], - valueDeserializer: Deserializer[T] - )(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = - fromTopics(strategy, topics, onRecord, valueDeserializer) - - /** Consumes the records from the provided topics, passing them through `onRecord`. - */ - @deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0") - def fromTopics[T]( - strategy: LoadTopicStrategy, - topics: NonEmptyList[String], - onRecord: ConsumerRecord[String, T] => Future[_], - valueDeserializer: Deserializer[T] - )(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = { - val logOffsets = logOffsetsForTopics(topics, strategy) - deprecatedLoad(logOffsets, onRecord, valueDeserializer) - } - - private val keySerializer = new StringDeserializer - - /** Consumes the records from the provided partitions, passing them through `onRecord`. - */ - @deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0") - def fromPartitions[T]( - strategy: LoadTopicStrategy, - partitions: NonEmptyList[TopicPartition], - onRecord: ConsumerRecord[String, T] => Future[_], - valueDeserializer: Deserializer[T] - )(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = { - val logOffsets = logOffsetsForPartitions(partitions, strategy) - deprecatedLoad(logOffsets, onRecord, valueDeserializer) - } - - private def deprecatedLoad[T]( - logOffsets: Future[Map[TopicPartition, LogOffsets]], - onRecord: ConsumerRecord[String, T] => Future[_], - valueDeserializer: Deserializer[T] - )(implicit system: ActorSystem): Source[Map[TopicPartition, Long], NotUsed] = { - val config = Config.loadOrThrow(system.settings.config).topicLoader - - import system.dispatcher - - load(logOffsets, config, None)(keySerializer, valueDeserializer, system) - .mapMaterializedValue(_ => NotUsed) - .mapAsync(config.parallelism.value)(r => onRecord(r).map(_ => r)) - .fold(logOffsets) { case (acc, _) => acc } - .flatMapConcat(Source.future) - .map(_.map { case (p, o) => p -> o.highest }) - } -} diff --git a/src/main/scala/uk/sky/kafka/topicloader/config/topicLoaderConfig.scala b/src/main/scala/uk/sky/kafka/topicloader/config/topicLoaderConfig.scala index 4293be47..68d179de 100644 --- a/src/main/scala/uk/sky/kafka/topicloader/config/topicLoaderConfig.scala +++ b/src/main/scala/uk/sky/kafka/topicloader/config/topicLoaderConfig.scala @@ -23,17 +23,9 @@ object PosInt { final case class Config(topicLoader: TopicLoaderConfig) -/** @param parallelism - * Determines how many Kafka records are processed in parallel by [[uk.sky.kafka.topicloader.TopicLoader]]. We - * recommend using a parallelism > 1 if you are processing the records by sending them to an akka.actor.Actor. This - * is so that messages are buffered in the akka.actor.Actor's mailbox, improving performance versus using a - * parallelism of 1. - */ final case class TopicLoaderConfig( idleTimeout: FiniteDuration, - bufferSize: PosInt, -// @deprecated("Kept for backward compatibility until clients can adapt", "TopicLoader 1.3.0") - parallelism: PosInt = PosInt.One + bufferSize: PosInt ) object Config { @@ -47,10 +39,7 @@ object Config { val bufferSize = PosInt(config.getInt(s"$basePath.buffer-size")) .validate(s"$basePath.buffer-size") - val parallelism = PosInt(config.getInt(s"$basePath.parallelism")) - .validate(s"$basePath.parallelism") - - (idleTimeout, bufferSize, parallelism).mapN(TopicLoaderConfig.apply).map(Config.apply) + (idleTimeout, bufferSize).mapN(TopicLoaderConfig.apply).map(Config.apply) } def loadOrThrow(config: TypesafeConfig): Config = load(config) match { diff --git a/src/test/scala/base/IntegrationSpecBase.scala b/src/test/scala/base/IntegrationSpecBase.scala index 19d2681b..5f445388 100644 --- a/src/test/scala/base/IntegrationSpecBase.scala +++ b/src/test/scala/base/IntegrationSpecBase.scala @@ -41,7 +41,6 @@ abstract class IntegrationSpecBase extends WordSpecBase with Eventually { |topic-loader { | idle-timeout = 5 minutes | buffer-size = 1000 - | parallelism = 5 |} |akka { | loglevel = "OFF" diff --git a/src/test/scala/integration/DeprecatedMethodsIntSpec.scala b/src/test/scala/integration/DeprecatedMethodsIntSpec.scala deleted file mode 100644 index c0115cb6..00000000 --- a/src/test/scala/integration/DeprecatedMethodsIntSpec.scala +++ /dev/null @@ -1,139 +0,0 @@ -package integration - -import akka.Done -import akka.actor.{Actor, ActorSystem, Props} -import akka.pattern.ask -import akka.stream.scaladsl.Sink -import akka.util.Timeout -import base.IntegrationSpecBase -import cats.data.NonEmptyList -import io.github.embeddedkafka.Codecs.{stringDeserializer, stringSerializer} -import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.TopicPartition -import uk.sky.kafka.topicloader._ - -import scala.concurrent.Future -import scala.jdk.CollectionConverters._ - -@deprecated("Remove when deprecated methods are gone", "") -class DeprecatedMethodsIntSpec extends IntegrationSpecBase { - - "fromTopics" should { - "execute onRecord for all messages in provided topics" in new TestContext { - val store = new RecordStore - val (recordsTopic1, recordsTopic2) = records(1 to 30).splitAt(15) - val topics = NonEmptyList.of(testTopic1, testTopic2) - - withRunningKafka { - createCustomTopics(topics) - publishToKafka(testTopic1, recordsTopic1) - publishToKafka(testTopic2, recordsTopic2) - - TopicLoader - .fromTopics(LoadAll, topics, store.storeRecord, stringDeserializer) - .runWith(Sink.ignore) - .futureValue shouldBe Done - - val processedRecords = store.getRecords.futureValue.map(recordToTuple) - processedRecords should contain theSameElementsAs (recordsTopic1 ++ recordsTopic2) - } - } - - "emit last offsets consumed" in new TestContext with KafkaConsumer { - withRunningKafka { - createCustomTopic(testTopic1, partitions = 5) - publishToKafka(testTopic1, records(1 to 15)) - - val highestOffsets = withAssignedConsumer(autoCommit = false, offsetReset = "latest", testTopic1) { consumer => - val tp = consumer.partitionsFor(testTopic1).asScala.map(pi => new TopicPartition(pi.topic, pi.partition)) - consumer.endOffsets(tp.asJava).asScala - } - - TopicLoader - .fromTopics[String](LoadAll, NonEmptyList.one(testTopic1), _ => Future.unit, stringDeserializer) - .runWith(Sink.seq) - .futureValue should contain theSameElementsAs Seq(highestOffsets) - } - } - - "emit highest offsets even when not consumed anything" in new TestContext with KafkaConsumer { - withRunningKafka { - createCustomTopic(testTopic1, partitions = 5) - - val lowestOffsets = withAssignedConsumer(autoCommit = false, offsetReset = "latest", testTopic1) { consumer => - val tp = consumer.partitionsFor(testTopic1).asScala.map(pi => new TopicPartition(pi.topic, pi.partition)) - consumer.beginningOffsets(tp.asJava).asScala - } - - TopicLoader - .fromTopics[String](LoadCommitted, NonEmptyList.one(testTopic1), _ => ???, stringDeserializer) - .runWith(Sink.seq) - .futureValue should contain theSameElementsAs Seq(lowestOffsets) - } - } - - "fail when store record is unsuccessful" in new TestContext { - val exception = new Exception("boom!") - val failingHandler: ConsumerRecord[String, String] => Future[Int] = - _ => Future.failed(exception) - - withRunningKafka { - createCustomTopics(NonEmptyList.one(testTopic1)) - - publishToKafka(testTopic1, records(1 to 15)) - - TopicLoader - .fromTopics[String](LoadAll, NonEmptyList.one(testTopic1), failingHandler, stringDeserializer) - .runWith(Sink.seq) - .failed - .futureValue shouldBe exception - } - } - } - - "fromPartitions" should { - "load data only from required partitions" in new TestContext with KafkaConsumer { - val recordsToPublish = records(1 to 15) - val partitionsToRead = NonEmptyList.of(1, 2) - val topicPartitions = partitionsToRead.map(p => new TopicPartition(testTopic1, p)) - - withRunningKafka { - createCustomTopic(testTopic1, partitions = 5) - publishToKafka(testTopic1, recordsToPublish) - moveOffsetToEnd(testTopic1) - - val store = new RecordStore() - - TopicLoader - .fromPartitions(LoadAll, topicPartitions, store.storeRecord, stringDeserializer) - .runWith(Sink.ignore) - .futureValue shouldBe Done - - store.getRecords.futureValue.map(_.partition).toSet should contain theSameElementsAs partitionsToRead.toList - } - } - } - - class RecordStore()(implicit system: ActorSystem) { - private val storeActor = system.actorOf(Props(new Store)) - - def storeRecord(rec: ConsumerRecord[String, String])(implicit timeout: Timeout): Future[Int] = - (storeActor ? rec).mapTo[Int] - - def getRecords(implicit timeout: Timeout): Future[List[ConsumerRecord[String, String]]] = - (storeActor ? Symbol("GET")).mapTo[List[ConsumerRecord[String, String]]] - - private class Store extends Actor { - override def receive: Receive = store(List.empty) - - def store(records: List[ConsumerRecord[_, _]]): Receive = { - case r: ConsumerRecord[_, _] => - val newRecs = records :+ r - sender() ! newRecs.size - context.become(store(newRecs)) - case Symbol("GET") => - sender() ! records - } - } - } -}