From f07cc4b1e2c576bd01d22705a91ae5cb653bd372 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 25 Nov 2024 10:15:49 +0000 Subject: [PATCH] Kinesis source improvements for larger deployments Two improvements getting ready for using common-streams on larger pods or with more shards. 1. Configurable max leases to steal at one time. The KCL default is 1. In order to avoid latency during scale up/down and pod-ration, we want the app to be quick to acquire shard-leases to process. With bigger instances we tend to have more shard-leases per instance, so we increase how aggressivley it acquires leases. 2. Set KCL `maxPendingProcessRecordsInput` to the minimum allowed (1). This is a precaution to avoid potentials OOMs in case a single pod subscribes to a very large number of shards. It shouldn't have a negative performance impact, because common-streams apps tend to manage their own pre-fetching. In fact, this makes the Kinesis source more similar to the other Sources which don't have in-built pre-fetching. 3. Fixes a bug in which latency was set by the latest record in a batch, not the earliest. --- .../snowplow/sources/kinesis/Utils.scala | 3 +- .../kinesis/src/main/resources/reference.conf | 1 + .../sources/kinesis/KCLScheduler.scala | 15 ++++++++- .../sources/kinesis/KinesisSource.scala | 7 +++-- .../sources/kinesis/KinesisSourceConfig.scala | 23 +++++++++++++- .../kinesis/KinesisSourceConfigSpec.scala | 31 +++++++++++-------- 6 files changed, 61 insertions(+), 19 deletions(-) diff --git a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala index fdcd7ae..61fc338 100644 --- a/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala +++ b/modules/it/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/Utils.scala @@ -95,7 +95,8 @@ object Utils { Some(endpoint), Some(endpoint), Some(endpoint), - 10.seconds + 10.seconds, + BigDecimal(1.0) ) def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig( diff --git a/modules/kinesis/src/main/resources/reference.conf b/modules/kinesis/src/main/resources/reference.conf index ab3b659..032d7e5 100644 --- a/modules/kinesis/src/main/resources/reference.conf +++ b/modules/kinesis/src/main/resources/reference.conf @@ -10,6 +10,7 @@ snowplow.defaults: { maxRecords: 1000 } leaseDuration: "10 seconds" + maxLeasesToStealAtOneTimeFactor: 2.0 } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala index 55ae8bd..4b24ae5 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala @@ -68,13 +68,16 @@ private[kinesis] object KCLScheduler { case KinesisSourceConfig.Retrieval.FanOut => new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName) case KinesisSourceConfig.Retrieval.Polling(maxRecords) => - new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords) + val c = new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords) + c.recordsFetcherFactory.maxPendingProcessRecordsInput(1) + c } } val leaseManagementConfig = configsBuilder.leaseManagementConfig .failoverTimeMillis(kinesisConfig.leaseDuration.toMillis) + .maxLeasesToStealAtOneTime(chooseMaxLeasesToStealAtOneTime(kinesisConfig)) // We ask to see empty batches, so that we can update the health check even when there are no records in the stream val processorConfig = @@ -147,4 +150,14 @@ private[kinesis] object KCLScheduler { } } + /** + * In order to avoid latency during scale up/down and pod-rotation, we want the app to be quick to + * acquire shard-leases to process. With bigger instances (more cores) we tend to have more + * shard-leases per instance, so we increase how aggressively it acquires leases. + */ + private def chooseMaxLeasesToStealAtOneTime(config: KinesisSourceConfig): Int = + (Runtime.getRuntime.availableProcessors * config.maxLeasesToStealAtOneTimeFactor) + .setScale(0, BigDecimal.RoundingMode.UP) + .toInt + } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 25d256c..4ef3487 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -79,13 +79,14 @@ object KinesisSource { Sync[F].realTime.flatMap(now => liveness.set(now)) private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = { - val chunk = Chunk.javaList(input.records()).map(_.data()) - val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above + val chunk = Chunk.javaList(input.records()).map(_.data()) + val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above + val firstRecord = input.records.asScala.head val checkpointable = Checkpointable.Record( new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber), input.checkpointer ) - LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(lastRecord.approximateArrivalTimestamp)) + LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp)) } private def handleShardEnd[F[_]: Sync]( diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala index 5667e20..403f35f 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfig.scala @@ -16,6 +16,26 @@ import java.net.URI import java.time.Instant import scala.concurrent.duration.FiniteDuration +/** + * Config to be supplied from the app's hocon + * + * @param appName + * Corresponds to the DynamoDB table name + * @param workerIdentifier + * If a pod uses a consistent name, then whenever the pod restarts (e.g. after crashing or after a + * rollout) then the pod can re-claim leases that it previously owned before the restart + * @param leaseDuration + * The KCL default for a lease is 10 seconds. If we increase this, then we can allow a pod longer + * to re-claim its old leases after a restart. + * @param maxLeasesToStealAtOneTimeFactor + * Controls how to pick the max number of leases to steal at one time. The actual max number to + * steal is multiplied by the number of runtime processors. In order to avoid latency during scale + * up/down and pod-rotation, we want the app to be quick to acquire shard-leases to process. With + * bigger instances (more cores/processors) we tend to have more shard-leases per instance, so we + * increase how aggressively it acquires leases. + * + * Other params are self-explanatory + */ case class KinesisSourceConfig( appName: String, streamName: String, @@ -25,7 +45,8 @@ case class KinesisSourceConfig( customEndpoint: Option[URI], dynamodbCustomEndpoint: Option[URI], cloudwatchCustomEndpoint: Option[URI], - leaseDuration: FiniteDuration + leaseDuration: FiniteDuration, + maxLeasesToStealAtOneTimeFactor: BigDecimal ) object KinesisSourceConfig { diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala index ab0c4d6..e6e4c91 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSourceConfigSpec.scala @@ -39,7 +39,8 @@ class KinesisSourceConfigSpec extends Specification { "initialPosition": { "type": "TrimHorizon" }, - "leaseDuration": "20 seconds" + "leaseDuration": "20 seconds", + "maxLeasesToStealAtOneTimeFactor": 0.42 } """ @@ -50,7 +51,8 @@ class KinesisSourceConfigSpec extends Specification { c.workerIdentifier must beEqualTo("my-identifier"), c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon), c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), - c.leaseDuration must beEqualTo(20.seconds) + c.leaseDuration must beEqualTo(20.seconds), + c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)) ).reduce(_ and _) } } @@ -68,7 +70,8 @@ class KinesisSourceConfigSpec extends Specification { "initialPosition": { "type": "TRIM_HORIZON" }, - "leaseDuration": "20 seconds" + "leaseDuration": "20 seconds", + "maxLeasesToStealAtOneTimeFactor": 0.42 } """ @@ -79,7 +82,8 @@ class KinesisSourceConfigSpec extends Specification { c.workerIdentifier must beEqualTo("my-identifier"), c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon), c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)), - c.leaseDuration must beEqualTo(20.seconds) + c.leaseDuration must beEqualTo(20.seconds), + c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42)) ).reduce(_ and _) } } @@ -98,15 +102,16 @@ class KinesisSourceConfigSpec extends Specification { val result = ConfigFactory.load(ConfigFactory.parseString(input)) val expected = KinesisSourceConfig( - appName = "my-app", - streamName = "my-stream", - workerIdentifier = System.getenv("HOSTNAME"), - initialPosition = KinesisSourceConfig.InitialPosition.Latest, - retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - customEndpoint = None, - dynamodbCustomEndpoint = None, - cloudwatchCustomEndpoint = None, - leaseDuration = 10.seconds + appName = "my-app", + streamName = "my-stream", + workerIdentifier = System.getenv("HOSTNAME"), + initialPosition = KinesisSourceConfig.InitialPosition.Latest, + retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), + customEndpoint = None, + dynamodbCustomEndpoint = None, + cloudwatchCustomEndpoint = None, + leaseDuration = 10.seconds, + maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0) ) result.as[Wrapper] must beRight.like { case w: Wrapper =>