Skip to content

Commit

Permalink
Allow throttling SQS
Browse files Browse the repository at this point in the history
Adds new config options to throttle SQS.
  • Loading branch information
AlexITC committed Aug 22, 2022
1 parent 6228304 commit 929833f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
6 changes: 6 additions & 0 deletions config/config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
randomFactor: 0.1
maxRetries: 5
maxRetriesWithin: 1 minute

# Optional, allows throttling sqs
throttle {
elements = 10
per = 2 seconds
}
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ object EventsStreamModule {
val goodSink = sink(config.output.good, config.input, kinesisClient)
val badSink = sink(config.output.bad, config.input, kinesisClient)

sqsSource(config.input)
val source = config
.input
.throttle
.map(t => sqsSource(config.input).throttle(t.elements, t.per))
.getOrElse(sqsSource(config.input))

source
.via(sqsMsg2kinesisMsg(config.input))
.alsoTo(Flow[Either[ParsedMsg, ParsedMsg]].mapConcat(_.toSeq).to(goodSink))
.alsoTo(Flow[Either[ParsedMsg, ParsedMsg]].mapConcat(_.left.toSeq).to(badSink))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ object Sqs2KinesisConfig {
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRetries: Int,
maxRetriesWithin: FiniteDuration
maxRetriesWithin: FiniteDuration,
throttle: Option[ThrottleConfig]
)

case class ThrottleConfig(
elements: Int,
per: FiniteDuration
)

/** Configure the output kinesis stream
Expand Down Expand Up @@ -83,6 +89,9 @@ object Sqs2KinesisConfig {
implicit val finiteDurationEncoder: Encoder[FiniteDuration] =
implicitly[Encoder[String]].contramap(_.toString)

implicit val throttleDecoder: Decoder[ThrottleConfig] = deriveDecoder[ThrottleConfig]
implicit val throttleEncoder: Encoder[ThrottleConfig] = deriveEncoder[ThrottleConfig]

implicit val sqsDecoder: Decoder[SqsConfig] = deriveDecoder[SqsConfig]
implicit val sqsEncoder: Encoder[SqsConfig] = deriveEncoder[SqsConfig]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object EventsStreamModuleSpec {
val messageId: String = "b73f2c49-874e-47c4-9a76-09751e3dc3e2"

val config: Sqs2KinesisConfig.SqsConfig =
Sqs2KinesisConfig.SqsConfig("myqueue", "kinesisKey", 1.second, 1.second, 1.0, 1, 1.minute)
Sqs2KinesisConfig.SqsConfig("myqueue", "kinesisKey", 1.second, 1.second, 1.0, 1, 1.minute, None)

def buildMessage(body: String): Message = {
val keyAttribute = MessageAttributeValue.builder.dataType("String").stringValue(kinesisKey).build
Expand Down

0 comments on commit 929833f

Please sign in to comment.