Skip to content

Commit

Permalink
Kinesis source improvements for larger deployments (#99)
Browse files Browse the repository at this point in the history
Two improvements getting ready for using common-streams on larger pods
or with more shards.

1. Configurable max leases to steal at one time. The KCL default is 1.
   In order to avoid latency during scale up/down and pod-ration, we
   want the app to be quick to acquire shard-leases to process. With
   bigger instances we tend to have more shard-leases per instance, so
   we increase how aggressivley it acquires leases.
2. Set KCL `maxPendingProcessRecordsInput` to the minimum allowed (1).
   This is a precaution to avoid potentials OOMs in case a single pod
   subscribes to a very large number of shards. It shouldn't have a
   negative performance impact, because common-streams apps tend to
   manage their own pre-fetching. In fact, this makes the Kinesis source
   more similar to the other Sources which don't have in-built
   pre-fetching.
3. Fixes a bug in which latency was set by the latest record in a batch,
   not the earliest.
  • Loading branch information
istreeter authored Nov 26, 2024
1 parent 200e4b5 commit 38aaf95
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ object Utils {
Some(endpoint),
Some(endpoint),
Some(endpoint),
10.seconds
10.seconds,
BigDecimal(1.0)
)

def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig(
Expand Down
1 change: 1 addition & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ snowplow.defaults: {
maxRecords: 1000
}
leaseDuration: "10 seconds"
maxLeasesToStealAtOneTimeFactor: 2.0
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,16 @@ private[kinesis] object KCLScheduler {
case KinesisSourceConfig.Retrieval.FanOut =>
new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName)
case KinesisSourceConfig.Retrieval.Polling(maxRecords) =>
new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords)
val c = new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords)
c.recordsFetcherFactory.maxPendingProcessRecordsInput(1)
c
}
}

val leaseManagementConfig =
configsBuilder.leaseManagementConfig
.failoverTimeMillis(kinesisConfig.leaseDuration.toMillis)
.maxLeasesToStealAtOneTime(chooseMaxLeasesToStealAtOneTime(kinesisConfig))

// We ask to see empty batches, so that we can update the health check even when there are no records in the stream
val processorConfig =
Expand Down Expand Up @@ -147,4 +150,14 @@ private[kinesis] object KCLScheduler {
}
}

/**
* In order to avoid latency during scale up/down and pod-rotation, we want the app to be quick to
* acquire shard-leases to process. With bigger instances (more cores) we tend to have more
* shard-leases per instance, so we increase how aggressively it acquires leases.
*/
private def chooseMaxLeasesToStealAtOneTime(config: KinesisSourceConfig): Int =
(Runtime.getRuntime.availableProcessors * config.maxLeasesToStealAtOneTimeFactor)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ object KinesisSource {
Sync[F].realTime.flatMap(now => liveness.set(now))

private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = {
val chunk = Chunk.javaList(input.records()).map(_.data())
val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above
val chunk = Chunk.javaList(input.records()).map(_.data())
val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above
val firstRecord = input.records.asScala.head
val checkpointable = Checkpointable.Record(
new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber),
input.checkpointer
)
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(lastRecord.approximateArrivalTimestamp))
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp))
}

private def handleShardEnd[F[_]: Sync](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,26 @@ import java.net.URI
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

/**
* Config to be supplied from the app's hocon
*
* @param appName
* Corresponds to the DynamoDB table name
* @param workerIdentifier
* If a pod uses a consistent name, then whenever the pod restarts (e.g. after crashing or after a
* rollout) then the pod can re-claim leases that it previously owned before the restart
* @param leaseDuration
* The KCL default for a lease is 10 seconds. If we increase this, then we can allow a pod longer
* to re-claim its old leases after a restart.
* @param maxLeasesToStealAtOneTimeFactor
* Controls how to pick the max number of leases to steal at one time. The actual max number to
* steal is multiplied by the number of runtime processors. In order to avoid latency during scale
* up/down and pod-rotation, we want the app to be quick to acquire shard-leases to process. With
* bigger instances (more cores/processors) we tend to have more shard-leases per instance, so we
* increase how aggressively it acquires leases.
*
* Other params are self-explanatory
*/
case class KinesisSourceConfig(
appName: String,
streamName: String,
Expand All @@ -25,7 +45,8 @@ case class KinesisSourceConfig(
customEndpoint: Option[URI],
dynamodbCustomEndpoint: Option[URI],
cloudwatchCustomEndpoint: Option[URI],
leaseDuration: FiniteDuration
leaseDuration: FiniteDuration,
maxLeasesToStealAtOneTimeFactor: BigDecimal
)

object KinesisSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class KinesisSourceConfigSpec extends Specification {
"initialPosition": {
"type": "TrimHorizon"
},
"leaseDuration": "20 seconds"
"leaseDuration": "20 seconds",
"maxLeasesToStealAtOneTimeFactor": 0.42
}
"""

Expand All @@ -50,7 +51,8 @@ class KinesisSourceConfigSpec extends Specification {
c.workerIdentifier must beEqualTo("my-identifier"),
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds)
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42))
).reduce(_ and _)
}
}
Expand All @@ -68,7 +70,8 @@ class KinesisSourceConfigSpec extends Specification {
"initialPosition": {
"type": "TRIM_HORIZON"
},
"leaseDuration": "20 seconds"
"leaseDuration": "20 seconds",
"maxLeasesToStealAtOneTimeFactor": 0.42
}
"""

Expand All @@ -79,7 +82,8 @@ class KinesisSourceConfigSpec extends Specification {
c.workerIdentifier must beEqualTo("my-identifier"),
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds)
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42))
).reduce(_ and _)
}
}
Expand All @@ -98,15 +102,16 @@ class KinesisSourceConfigSpec extends Specification {
val result = ConfigFactory.load(ConfigFactory.parseString(input))

val expected = KinesisSourceConfig(
appName = "my-app",
streamName = "my-stream",
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
appName = "my-app",
streamName = "my-stream",
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0)
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down

0 comments on commit 38aaf95

Please sign in to comment.