Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis source improvements for larger deployments #99

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ object Utils {
Some(endpoint),
Some(endpoint),
Some(endpoint),
10.seconds
10.seconds,
BigDecimal(1.0)
)

def getKinesisSinkConfig(endpoint: URI)(streamName: String): KinesisSinkConfig = KinesisSinkConfig(
Expand Down
1 change: 1 addition & 0 deletions modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ snowplow.defaults: {
maxRecords: 1000
}
leaseDuration: "10 seconds"
maxLeasesToStealAtOneTimeFactor: 2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious that this is a decimal, you can't have a fraction of a lease surely? Doesn't seem like a likely oversight - so I'm guessing there's a reason I don't see?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This number is first multiplied by the number of processors. It is helpful to make it a decimal, e.g. maybe we want a 8-core instance to steal 2 leases at one time, so we set this value to 0.25.

I've been using this pattern in lots of places recently. It's a good way to make the app auto-guess a good configuration, instead of putting the burden on the user to configure it appropriately for its vertical size. And I consistently put Factor as a suffix of params that get multiplied by num processors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - interesting. But nuanced, but I get it. It's cores*this = n instances. I assume it rounds up to the nearest CEIL, since it's max... Cool, thanks for explaining!

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,16 @@ private[kinesis] object KCLScheduler {
case KinesisSourceConfig.Retrieval.FanOut =>
new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName)
case KinesisSourceConfig.Retrieval.Polling(maxRecords) =>
new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords)
val c = new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords)
c.recordsFetcherFactory.maxPendingProcessRecordsInput(1)
c
}
}

val leaseManagementConfig =
configsBuilder.leaseManagementConfig
.failoverTimeMillis(kinesisConfig.leaseDuration.toMillis)
.maxLeasesToStealAtOneTime(chooseMaxLeasesToStealAtOneTime(kinesisConfig))

// We ask to see empty batches, so that we can update the health check even when there are no records in the stream
val processorConfig =
Expand Down Expand Up @@ -147,4 +150,14 @@ private[kinesis] object KCLScheduler {
}
}

/**
* In order to avoid latency during scale up/down and pod-rotation, we want the app to be quick to
* acquire shard-leases to process. With bigger instances (more cores) we tend to have more
* shard-leases per instance, so we increase how aggressively it acquires leases.
*/
private def chooseMaxLeasesToStealAtOneTime(config: KinesisSourceConfig): Int =
(Runtime.getRuntime.availableProcessors * config.maxLeasesToStealAtOneTimeFactor)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ object KinesisSource {
Sync[F].realTime.flatMap(now => liveness.set(now))

private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = {
val chunk = Chunk.javaList(input.records()).map(_.data())
val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above
val chunk = Chunk.javaList(input.records()).map(_.data())
val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above
val firstRecord = input.records.asScala.head
val checkpointable = Checkpointable.Record(
new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber),
input.checkpointer
)
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(lastRecord.approximateArrivalTimestamp))
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp))
}

private def handleShardEnd[F[_]: Sync](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,26 @@ import java.net.URI
import java.time.Instant
import scala.concurrent.duration.FiniteDuration

/**
* Config to be supplied from the app's hocon
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the addition of in-code documentation for the other parameters too

* @param appName
* Corresponds to the DynamoDB table name
* @param workerIdentifier
* If a pod uses a consistent name, then whenever the pod restarts (e.g. after crashing or after a
* rollout) then the pod can re-claim leases that it previously owned before the restart
* @param leaseDuration
* The KCL default for a lease is 10 seconds. If we increase this, then we can allow a pod longer
* to re-claim its old leases after a restart.
* @param maxLeasesToStealAtOneTimeFactor
* Controls how to pick the max number of leases to steal at one time. The actual max number to
* steal is multiplied by the number of runtime processors. In order to avoid latency during scale
* up/down and pod-rotation, we want the app to be quick to acquire shard-leases to process. With
* bigger instances (more cores/processors) we tend to have more shard-leases per instance, so we
* increase how aggressively it acquires leases.
*
* Other params are self-explanatory
*/
case class KinesisSourceConfig(
appName: String,
streamName: String,
Expand All @@ -25,7 +45,8 @@ case class KinesisSourceConfig(
customEndpoint: Option[URI],
dynamodbCustomEndpoint: Option[URI],
cloudwatchCustomEndpoint: Option[URI],
leaseDuration: FiniteDuration
leaseDuration: FiniteDuration,
maxLeasesToStealAtOneTimeFactor: BigDecimal
)

object KinesisSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class KinesisSourceConfigSpec extends Specification {
"initialPosition": {
"type": "TrimHorizon"
},
"leaseDuration": "20 seconds"
"leaseDuration": "20 seconds",
"maxLeasesToStealAtOneTimeFactor": 0.42
}
"""

Expand All @@ -50,7 +51,8 @@ class KinesisSourceConfigSpec extends Specification {
c.workerIdentifier must beEqualTo("my-identifier"),
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds)
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42))
).reduce(_ and _)
}
}
Expand All @@ -68,7 +70,8 @@ class KinesisSourceConfigSpec extends Specification {
"initialPosition": {
"type": "TRIM_HORIZON"
},
"leaseDuration": "20 seconds"
"leaseDuration": "20 seconds",
"maxLeasesToStealAtOneTimeFactor": 0.42
}
"""

Expand All @@ -79,7 +82,8 @@ class KinesisSourceConfigSpec extends Specification {
c.workerIdentifier must beEqualTo("my-identifier"),
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.leaseDuration must beEqualTo(20.seconds)
c.leaseDuration must beEqualTo(20.seconds),
c.maxLeasesToStealAtOneTimeFactor must beEqualTo(BigDecimal(0.42))
).reduce(_ and _)
}
}
Expand All @@ -98,15 +102,16 @@ class KinesisSourceConfigSpec extends Specification {
val result = ConfigFactory.load(ConfigFactory.parseString(input))

val expected = KinesisSourceConfig(
appName = "my-app",
streamName = "my-stream",
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
appName = "my-app",
streamName = "my-stream",
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0)
)

result.as[Wrapper] must beRight.like { case w: Wrapper =>
Expand Down