Skip to content

Commit

Permalink
Bump common-streams to 0.9.0
Browse files Browse the repository at this point in the history
See snowplow-incubator/common-streams#99 for the relevant change

This library upgrade brings improvements to the Kinesis source, which
should help on vertically larger instances.
  • Loading branch information
istreeter committed Nov 26, 2024
1 parent 4a8e223 commit 4fcc289
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
11 changes: 11 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@
"maxRecords": 1000
}

# -- Name of this KCL worker used in the dynamodb lease table
"workerIdentifier": ${HOSTNAME}

# -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires.
"leaseDuration": "10 seconds"

# -- Controls how to pick the max number of leases to steal at one time.
# -- E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0, then allow the KCL to steal up to 8 leases.
# -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency
"maxLeasesToStealAtOneTimeFactor": 2.0

}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ class KinesisConfigSpec extends Specification with CatsEffect {
object KinesisConfigSpec {
private val minimalConfig = Config[KinesisSourceConfig, KinesisSinkConfig](
input = KinesisSourceConfig(
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0)
),
output = Config.Output(
good = Config.Snowflake(
Expand Down Expand Up @@ -138,15 +139,16 @@ object KinesisConfigSpec {
*/
private val extendedConfig = Config[KinesisSourceConfig, KinesisSinkConfig](
input = KinesisSourceConfig(
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0)
),
output = Config.Output(
good = Config.Snowflake(
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object Dependencies {
val protobuf = "3.25.5" // Version override

// Snowplow
val streams = "0.8.1"
val streams = "0.9.0-M2"

// tests
val specs2 = "4.20.0"
Expand Down

0 comments on commit 4fcc289

Please sign in to comment.