From b665f72a6bab255baca7e4e2986ee0f79c493a52 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Mon, 22 Aug 2022 11:52:39 -0600 Subject: [PATCH] Allow throttling SQS Adds new config options to throttle SQS. --- config/config.reference.hocon | 6 ++++++ .../sqs2kinesis/EventsStreamModule.scala | 8 +++++++- .../sqs2kinesis/config/Sqs2KinesisConfig.scala | 11 ++++++++++- .../EventsStreamModuleSpec.scala | 2 +- .../config/CliConfigSpec.scala | 6 ++++-- 5 files changed, 28 insertions(+), 5 deletions(-) diff --git a/config/config.reference.hocon b/config/config.reference.hocon index 2d4125f..d47546f 100644 --- a/config/config.reference.hocon +++ b/config/config.reference.hocon @@ -13,6 +13,12 @@ randomFactor: 0.1 maxRetries: 5 maxRetriesWithin: 1 minute + + # Optional, allows throttling sqs + throttle { + elements = 10 + per = 2 seconds + } } "output": { diff --git a/src/main/scala/com.snowplowanalytics/sqs2kinesis/EventsStreamModule.scala b/src/main/scala/com.snowplowanalytics/sqs2kinesis/EventsStreamModule.scala index fe05745..6184d70 100644 --- a/src/main/scala/com.snowplowanalytics/sqs2kinesis/EventsStreamModule.scala +++ b/src/main/scala/com.snowplowanalytics/sqs2kinesis/EventsStreamModule.scala @@ -69,7 +69,13 @@ object EventsStreamModule { val goodSink = sink(config.output.good, config.input, kinesisClient) val badSink = sink(config.output.bad, config.input, kinesisClient) - sqsSource(config.input) + val source = config + .input + .throttle + .map(t => sqsSource(config.input).throttle(t.elements, t.per)) + .getOrElse(sqsSource(config.input)) + + source .via(sqsMsg2kinesisMsg(config.input)) .alsoTo(Flow[Either[ParsedMsg, ParsedMsg]].mapConcat(_.toSeq).to(goodSink)) .alsoTo(Flow[Either[ParsedMsg, ParsedMsg]].mapConcat(_.left.toSeq).to(badSink)) diff --git a/src/main/scala/com.snowplowanalytics/sqs2kinesis/config/Sqs2KinesisConfig.scala b/src/main/scala/com.snowplowanalytics/sqs2kinesis/config/Sqs2KinesisConfig.scala index b4f66dd..dd71dc8 100644 --- a/src/main/scala/com.snowplowanalytics/sqs2kinesis/config/Sqs2KinesisConfig.scala +++ b/src/main/scala/com.snowplowanalytics/sqs2kinesis/config/Sqs2KinesisConfig.scala @@ -50,7 +50,13 @@ object Sqs2KinesisConfig { maxBackoff: FiniteDuration, randomFactor: Double, maxRetries: Int, - maxRetriesWithin: FiniteDuration + maxRetriesWithin: FiniteDuration, + throttle: Option[ThrottleConfig] + ) + + case class ThrottleConfig( + elements: Int, + per: FiniteDuration ) /** Configure the output kinesis stream @@ -83,6 +89,9 @@ object Sqs2KinesisConfig { implicit val finiteDurationEncoder: Encoder[FiniteDuration] = implicitly[Encoder[String]].contramap(_.toString) + implicit val throttleDecoder: Decoder[ThrottleConfig] = deriveDecoder[ThrottleConfig] + implicit val throttleEncoder: Encoder[ThrottleConfig] = deriveEncoder[ThrottleConfig] + implicit val sqsDecoder: Decoder[SqsConfig] = deriveDecoder[SqsConfig] implicit val sqsEncoder: Encoder[SqsConfig] = deriveEncoder[SqsConfig] diff --git a/src/test/scala/com.snowplowanalytics.sqs2kinesis/EventsStreamModuleSpec.scala b/src/test/scala/com.snowplowanalytics.sqs2kinesis/EventsStreamModuleSpec.scala index b158ae1..209db2b 100644 --- a/src/test/scala/com.snowplowanalytics.sqs2kinesis/EventsStreamModuleSpec.scala +++ b/src/test/scala/com.snowplowanalytics.sqs2kinesis/EventsStreamModuleSpec.scala @@ -74,7 +74,7 @@ object EventsStreamModuleSpec { val messageId: String = "b73f2c49-874e-47c4-9a76-09751e3dc3e2" val config: Sqs2KinesisConfig.SqsConfig = - Sqs2KinesisConfig.SqsConfig("myqueue", "kinesisKey", 1.second, 1.second, 1.0, 1, 1.minute) + Sqs2KinesisConfig.SqsConfig("myqueue", "kinesisKey", 1.second, 1.second, 1.0, 1, 1.minute, None) def buildMessage(body: String): Message = { val keyAttribute = MessageAttributeValue.builder.dataType("String").stringValue(kinesisKey).build diff --git a/src/test/scala/com.snowplowanalytics.sqs2kinesis/config/CliConfigSpec.scala b/src/test/scala/com.snowplowanalytics.sqs2kinesis/config/CliConfigSpec.scala index 1e8aac2..75f1a68 100644 --- a/src/test/scala/com.snowplowanalytics.sqs2kinesis/config/CliConfigSpec.scala +++ b/src/test/scala/com.snowplowanalytics.sqs2kinesis/config/CliConfigSpec.scala @@ -31,7 +31,8 @@ class CliConfigSpec extends Specification { 5.second, 0.1, 5, - 1.minute + 1.minute, + None ), Output( KinesisConfig("test-stream-payloads", 5000000, 500, 1.second, 500.millis, 1.second, 0.1, 5), @@ -52,7 +53,8 @@ class CliConfigSpec extends Specification { 5.second, 0.1, 5, - 1.minute + 1.minute, + Some(ThrottleConfig(10, 2.seconds)) ), Output( KinesisConfig("test-stream-payloads", 5000000, 500, 1.second, 500.millis, 1.second, 0.1, 5),