From f8901529b36cf4fe208bb182677f5faca7bd9155 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sun, 8 Sep 2024 21:07:27 +0100 Subject: [PATCH] Enable KCL option skipShardSyncAtWorkerInitializationIfLeasesExist --- .../snowplow/sources/kinesis/KinesisSource.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index bded23a..84b179a 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -202,9 +202,13 @@ object KinesisSource { configsBuilder.processorConfig .callProcessRecordsEvenForEmptyRecordList(true) + val coordinatorConfig = + configsBuilder.coordinatorConfig + .skipShardSyncAtWorkerInitializationIfLeasesExist(true) + new Scheduler( configsBuilder.checkpointConfig, - configsBuilder.coordinatorConfig, + coordinatorConfig, leaseManagementConfig, configsBuilder.lifecycleConfig, configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE),