Skip to content

Commit

Permalink
Remove posint as akka checks anyway
Browse files Browse the repository at this point in the history
  • Loading branch information
bcarter97 committed Aug 6, 2022
1 parent 8d5515f commit c26e7f1
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 29 deletions.
17 changes: 6 additions & 11 deletions src/main/scala/uk/sky/kafka/topicloader/TopicLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import uk.sky.kafka.topicloader.config.{Config, TopicLoaderConfig}

import scala.concurrent.Future
import scala.jdk.CollectionConverters.*
import scala.util.Using

object TopicLoader extends TopicLoader {
private[topicloader] case class LogOffsets(lowest: Long, highest: Long)
Expand Down Expand Up @@ -76,10 +77,7 @@ trait TopicLoader extends LazyLogging {
strategy: LoadTopicStrategy,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]] = None
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Future[Consumer.Control]] = {
val config =
Config
.loadOrThrow(system.settings.config)
.topicLoader
val config = Config.loadOrThrow(system.settings.config).topicLoader
load(logOffsetsForTopics(topics, strategy, config), config, maybeConsumerSettings)
}

Expand Down Expand Up @@ -202,10 +200,10 @@ trait TopicLoader extends LazyLogging {
startingOffsets: Map[TopicPartition, Long],
config: TopicLoaderConfig,
maybeConsumerSettings: Option[ConsumerSettings[Array[Byte], Array[Byte]]]
)(implicit system: ActorSystem) =
)(implicit system: ActorSystem): Source[ConsumerRecord[K, V], Consumer.Control] =
Consumer
.plainSource(consumerSettings(maybeConsumerSettings, config), Subscriptions.assignmentWithOffset(startingOffsets))
.buffer(config.bufferSize.value, OverflowStrategy.backpressure)
.buffer(config.bufferSize, OverflowStrategy.backpressure)
.map(cr => cr.bimap(_.deserialize[K](cr.topic), _.deserialize[V](cr.topic)))

def consumerSettings(
Expand All @@ -224,11 +222,8 @@ trait TopicLoader extends LazyLogging {

private def withStandaloneConsumer[T](
settings: ConsumerSettings[Array[Byte], Array[Byte]]
)(f: Consumer[Array[Byte], Array[Byte]] => T): T = {
val consumer = settings.createKafkaConsumer()
try f(consumer)
finally consumer.close()
}
)(f: Consumer[Array[Byte], Array[Byte]] => T): T =
Using.resource(settings.createKafkaConsumer())(f)

private def offsetsFrom(partitions: List[TopicPartition])(
f: JList[TopicPartition] => JMap[TopicPartition, JLong]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,11 @@ import com.typesafe.config.{Config as TypesafeConfig, ConfigException}
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

final case class PosInt private (_value: Int) {
val value: Int = _value
}

object PosInt {
def apply(value: Int): Either[IllegalArgumentException, PosInt] =
if (value > 0) new PosInt(value).asRight
else new IllegalArgumentException(s"$value is not a positive Int").asLeft
}

final case class Config(topicLoader: TopicLoaderConfig)

final case class TopicLoaderConfig(
idleTimeout: FiniteDuration,
bufferSize: PosInt,
bufferSize: Int,
clientId: Option[String]
)

Expand All @@ -35,8 +25,7 @@ object Config {
FiniteDuration(config.getDuration(s"$basePath.idle-timeout").toNanos, TimeUnit.NANOSECONDS)
).validate(s"$basePath.idle-timeout")

val bufferSize = PosInt(config.getInt(s"$basePath.buffer-size"))
.validate(s"$basePath.buffer-size")
val bufferSize = Try(config.getInt(s"$basePath.buffer-size")).validate(s"$basePath.buffer-size")

val clientId = Try(
if (config.hasPath(s"$basePath.client-id")) Some(config.getString(s"$basePath.client-id"))
Expand Down
9 changes: 4 additions & 5 deletions src/test/scala/unit/ConfigSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.typesafe.config.{ConfigException, ConfigFactory}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import uk.sky.kafka.topicloader.TopicLoader.consumerSettings
import uk.sky.kafka.topicloader.config.Config
import uk.sky.kafka.topicloader.config.{Config, TopicLoaderConfig}

import scala.concurrent.duration.*

Expand Down Expand Up @@ -109,10 +109,9 @@ class ConfigSpec extends UnitSpecBase {
)
)

val config = Config.loadOrThrow(system.settings.config)
config.topicLoader.idleTimeout shouldBe 1.second
config.topicLoader.bufferSize.value shouldBe 10
config.topicLoader.clientId.value shouldBe "test-client-id"
val expected = Config(TopicLoaderConfig(1.second, 10, "test-client-id".some))
val config = Config.loadOrThrow(system.settings.config)
config shouldBe expected
}

"fail to load an invalid config" in {
Expand Down

0 comments on commit c26e7f1

Please sign in to comment.