Skip to content

Commit

Permalink
enrich-inesis: Cap recordTtl in KPL (close #581)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Mar 25, 2022
1 parent 2dba8c5 commit bd46542
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 9 deletions.
15 changes: 15 additions & 0 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
"maxBufferedTime": 100 millis

# Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
# The KPL then yields back to the JVM, which will log the error, and might retry sending.
"recordTtl": 20 seconds

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
"collection": {
# Maximum number of Kinesis records to pack into a PutRecords request
Expand Down Expand Up @@ -166,11 +171,16 @@
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
"maxBufferedTime": 100 millis

# Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
# The KPL then yields back to the JVM, which will log the error, and might retry sending.
"recordTtl": 20 seconds

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
"collection": {
# Maximum number of Kinesis records to pack into a PutRecords request
Expand Down Expand Up @@ -239,11 +249,16 @@
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
"maxBufferedTime": 100 millis

# Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
# The KPL then yields back to the JVM, which will log the error, and might retry sending.
"recordTtl": 20 seconds

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
"collection": {
# Maximum number of Kinesis records to pack into a PutRecords request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object ConfigFile {
case ConfigFile(_, _, _, Some(aup), _, _, _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, _, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _, _, _) if s.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _) if output.streamName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _) if t.isEmpty =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object ParsedConfigs {
if (invalidAttributes.nonEmpty) NonEmptyList(invalidAttributes.head, invalidAttributes.tail.toList).invalid
else output.valid
}
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _, _, _, _, _, _) if !enrichedFieldsMap.contains(key) =>
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _, _, _, _, _, _, _) if !enrichedFieldsMap.contains(key) =>
NonEmptyList.one(s"Partition key $key not valid").invalid
case _ =>
output.valid
Expand All @@ -123,7 +123,7 @@ object ParsedConfigs {
attributes.contains(s)
}
attributesFromFields(fields)
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _, _, _, _, _, _) =>
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _, _, _, _, _, _, _) =>
val fields = ParsedConfigs.enrichedFieldsMap.filter {
case (s, _) =>
s == key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ object io {
region: Option[String],
partitionKey: Option[String],
backoffPolicy: BackoffPolicy,
recordTtl: FiniteDuration,
maxBufferedTime: FiniteDuration,
collection: Collection,
aggregation: Option[Aggregation],
Expand All @@ -212,7 +213,11 @@ object io {
cloudwatchPort: Option[Long]
) extends Output

case class BackoffPolicy(minBackoff: FiniteDuration, maxBackoff: FiniteDuration)
case class BackoffPolicy(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
maxRetries: Int
)
object BackoffPolicy {
implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] =
deriveConfiguredDecoder[BackoffPolicy]
Expand Down Expand Up @@ -246,7 +251,7 @@ object io {
case _ =>
s"Topic must conform projects/project-name/topics/topic-name format, $top given".asLeft
}
case Kinesis(s, r, _, _, _, _, _, _, _, _, _, _, _) if s.isEmpty && r.nonEmpty =>
case k: Kinesis if k.streamName.isEmpty && k.region.nonEmpty =>
"streamName needs to be set".asLeft
case other => other.asRight
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class ConfigFileSpec extends Specification with CatsIO {
"enriched",
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds),
io.Output.BackoffPolicy(100.millis, 10.seconds, 10),
20.seconds,
100.millis,
io.Output.Collection(500, 5242880),
None,
Expand All @@ -103,7 +104,8 @@ class ConfigFileSpec extends Specification with CatsIO {
"pii",
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds),
io.Output.BackoffPolicy(100.millis, 10.seconds, 10),
20.seconds,
100.millis,
io.Output.Collection(500, 5242880),
None,
Expand All @@ -119,7 +121,8 @@ class ConfigFileSpec extends Specification with CatsIO {
"bad",
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds),
io.Output.BackoffPolicy(100.millis, 10.seconds, 10),
20.seconds,
100.millis,
io.Output.Collection(500, 5242880),
None,
Expand Down
6 changes: 6 additions & 0 deletions modules/kinesis/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
"maxBufferedTime": 100 milliseconds
"recordTtl": 20 seconds
"collection": {
"maxCount": 500
"maxSize": 5242880
Expand All @@ -35,8 +37,10 @@
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
"maxBufferedTime": 100 milliseconds
"recordTtl": 20 seconds
"collection": {
"maxCount": 500
"maxSize": 5242880
Expand All @@ -50,8 +54,10 @@
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
"maxBufferedTime": 100 milliseconds
"recordTtl": 20 seconds
"collection": {
"maxCount": 500
"maxSize": 5242880
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object Sink {
.setCollectionMaxSize(config.collection.maxSize)
.setMaxConnections(config.maxConnections)
.setLogLevel(config.logLevel)
.setRecordTtl(Long.MaxValue) // retry records forever
.setRecordTtl(config.recordTtl.toMillis)

// See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
val withAggregation = config.aggregation match {
Expand Down Expand Up @@ -124,6 +124,7 @@ object Sink {
data: AttributedData[Array[Byte]]
): F[Unit] = {
val retryPolicy = capDelay[F](config.backoffPolicy.maxBackoff, fullJitter[F](config.backoffPolicy.minBackoff))
.join(limitRetries(config.backoffPolicy.maxRetries))
val partitionKey = data.attributes.toList match { // there can be only one attribute : the partition key
case head :: Nil => head._2
case _ => UUID.randomUUID().toString
Expand Down

0 comments on commit bd46542

Please sign in to comment.