diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/Checkpointable.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/Checkpointable.scala new file mode 100644 index 0000000..3609105 --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/Checkpointable.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import cats.implicits._ +import cats.{Order, Semigroup} +import software.amazon.kinesis.processor.RecordProcessorCheckpointer +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber + +import java.util.concurrent.CountDownLatch + +private sealed trait Checkpointable { + def extendedSequenceNumber: ExtendedSequenceNumber +} + +private object Checkpointable { + final case class Record(extendedSequenceNumber: ExtendedSequenceNumber, checkpointer: RecordProcessorCheckpointer) extends Checkpointable + + final case class ShardEnd(checkpointer: RecordProcessorCheckpointer, release: CountDownLatch) extends Checkpointable { + override def extendedSequenceNumber: ExtendedSequenceNumber = ExtendedSequenceNumber.SHARD_END + } + + implicit def checkpointableOrder: Order[Checkpointable] = Order.from { case (a, b) => + a.extendedSequenceNumber.compareTo(b.extendedSequenceNumber) + } + + implicit def checkpointableSemigroup: Semigroup[Checkpointable] = new Semigroup[Checkpointable] { + def combine(x: Checkpointable, y: Checkpointable): Checkpointable = + x.max(y) + } +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLAction.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLAction.scala new file mode 100644 index 0000000..f3774d5 --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLAction.scala @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEndedInput} + +import java.util.concurrent.CountDownLatch + +private sealed trait KCLAction + +private object KCLAction { + + final case class ProcessRecords(shardId: String, processRecordsInput: ProcessRecordsInput) extends KCLAction + final case class ShardEnd( + shardId: String, + await: CountDownLatch, + shardEndedInput: ShardEndedInput + ) extends KCLAction + final case class KCLError(t: Throwable) extends KCLAction + +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala new file mode 100644 index 0000000..55ae8bd --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KCLScheduler.scala @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import cats.effect.implicits._ +import cats.effect.{Async, Resource, Sync} +import cats.implicits._ +import com.snowplowanalytics.snowplow.sources.kinesis.KCLAction.KCLError +import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient +import software.amazon.kinesis.common.{ConfigsBuilder, InitialPositionInStream, InitialPositionInStreamExtended} +import software.amazon.kinesis.coordinator.{Scheduler, WorkerStateChangeListener} +import software.amazon.kinesis.metrics.MetricsLevel +import software.amazon.kinesis.processor.SingleStreamTracker +import software.amazon.kinesis.retrieval.fanout.FanOutConfig +import software.amazon.kinesis.retrieval.polling.PollingConfig + +import java.net.URI +import java.util.Date +import java.util.concurrent.SynchronousQueue +import java.util.concurrent.atomic.AtomicReference + +private[kinesis] object KCLScheduler { + + def populateQueue[F[_]: Async]( + config: KinesisSourceConfig, + queue: SynchronousQueue[KCLAction] + ): Resource[F, Unit] = + for { + kinesis <- mkKinesisClient[F](config.customEndpoint) + dynamo <- mkDynamoDbClient[F](config.dynamodbCustomEndpoint) + cloudWatch <- mkCloudWatchClient[F](config.cloudwatchCustomEndpoint) + scheduler <- Resource.eval(mkScheduler(kinesis, dynamo, cloudWatch, config, queue)) + _ <- runInBackground(scheduler) + } yield () + + private def mkScheduler[F[_]: Sync]( + kinesisClient: KinesisAsyncClient, + dynamoDbClient: DynamoDbAsyncClient, + cloudWatchClient: CloudWatchAsyncClient, + kinesisConfig: KinesisSourceConfig, + queue: SynchronousQueue[KCLAction] + ): F[Scheduler] = + Sync[F].delay { + val configsBuilder = + new ConfigsBuilder( + kinesisConfig.streamName, + kinesisConfig.appName, + kinesisClient, + dynamoDbClient, + cloudWatchClient, + kinesisConfig.workerIdentifier, + () => ShardRecordProcessor(queue, new AtomicReference(Set.empty[String])) + ) + + val retrievalConfig = + configsBuilder.retrievalConfig + .streamTracker(new SingleStreamTracker(kinesisConfig.streamName, initialPositionOf(kinesisConfig.initialPosition))) + .retrievalSpecificConfig { + kinesisConfig.retrievalMode match { + case KinesisSourceConfig.Retrieval.FanOut => + new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName) + case KinesisSourceConfig.Retrieval.Polling(maxRecords) => + new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords) + } + } + + val leaseManagementConfig = + configsBuilder.leaseManagementConfig + .failoverTimeMillis(kinesisConfig.leaseDuration.toMillis) + + // We ask to see empty batches, so that we can update the health check even when there are no records in the stream + val processorConfig = + configsBuilder.processorConfig + .callProcessRecordsEvenForEmptyRecordList(true) + + val coordinatorConfig = configsBuilder.coordinatorConfig + .workerStateChangeListener(new WorkerStateChangeListener { + def onWorkerStateChange(newState: WorkerStateChangeListener.WorkerState): Unit = () + override def onAllInitializationAttemptsFailed(e: Throwable): Unit = + queue.put(KCLError(e)) + }) + + new Scheduler( + configsBuilder.checkpointConfig, + coordinatorConfig, + leaseManagementConfig, + configsBuilder.lifecycleConfig, + configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE), + processorConfig, + retrievalConfig + ) + } + + private def runInBackground[F[_]: Async](scheduler: Scheduler): Resource[F, Unit] = + Sync[F].blocking(scheduler.run()).background *> Resource.onFinalize(Sync[F].blocking(scheduler.shutdown())) + + private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended = + config match { + case KinesisSourceConfig.InitialPosition.Latest => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST) + case KinesisSourceConfig.InitialPosition.TrimHorizon => + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) + case KinesisSourceConfig.InitialPosition.AtTimestamp(instant) => + InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(instant)) + } + + private def mkKinesisClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, KinesisAsyncClient] = + Resource.fromAutoCloseable { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint + val builder = + KinesisAsyncClient + .builder() + .defaultsMode(DefaultsMode.AUTO) + val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) + customized.build + } + } + + private def mkDynamoDbClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, DynamoDbAsyncClient] = + Resource.fromAutoCloseable { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint + val builder = + DynamoDbAsyncClient + .builder() + .defaultsMode(DefaultsMode.AUTO) + val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) + customized.build + } + } + + private def mkCloudWatchClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, CloudWatchAsyncClient] = + Resource.fromAutoCloseable { + Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint + val builder = + CloudWatchAsyncClient + .builder() + .defaultsMode(DefaultsMode.AUTO) + val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) + customized.build + } + } + +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala new file mode 100644 index 0000000..7190e1f --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisCheckpointer.scala @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import cats.effect.{Async, Sync} +import cats.implicits._ +import cats.effect.implicits._ +import com.snowplowanalytics.snowplow.sources.internal.Checkpointer +import org.typelevel.log4cats.Logger +import software.amazon.kinesis.exceptions.ShutdownException +import software.amazon.kinesis.processor.RecordProcessorCheckpointer +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber + +import java.util.concurrent.CountDownLatch + +private class KinesisCheckpointer[F[_]: Async: Logger] extends Checkpointer[F, Map[String, Checkpointable]] { + + override val empty: Map[String, Checkpointable] = Map.empty + + override def combine(x: Map[String, Checkpointable], y: Map[String, Checkpointable]): Map[String, Checkpointable] = + x |+| y + + override def ack(c: Map[String, Checkpointable]): F[Unit] = + c.toList.parTraverse_ { + case (shardId, Checkpointable.Record(extendedSequenceNumber, checkpointer)) => + checkpointRecord(shardId, extendedSequenceNumber, checkpointer) + case (shardId, Checkpointable.ShardEnd(checkpointer, release)) => + checkpointShardEnd(shardId, checkpointer, release) + } + + override def nack(c: Map[String, Checkpointable]): F[Unit] = + Sync[F].unit + + private def checkpointShardEnd( + shardId: String, + checkpointer: RecordProcessorCheckpointer, + release: CountDownLatch + ) = + Logger[F].debug(s"Checkpointing shard $shardId at SHARD_END") *> + Sync[F].blocking(checkpointer.checkpoint()).recoverWith(ignoreShutdownExceptions(shardId)) *> + Sync[F].delay(release.countDown()) + + private def checkpointRecord( + shardId: String, + extendedSequenceNumber: ExtendedSequenceNumber, + checkpointer: RecordProcessorCheckpointer + ) = + Logger[F].debug(s"Checkpointing shard $shardId at $extendedSequenceNumber") *> + Sync[F] + .blocking( + checkpointer.checkpoint(extendedSequenceNumber.sequenceNumber, extendedSequenceNumber.subSequenceNumber) + ) + .recoverWith(ignoreShutdownExceptions(shardId)) + + private def ignoreShutdownExceptions(shardId: String): PartialFunction[Throwable, F[Unit]] = { case _: ShutdownException => + // The ShardRecordProcessor instance has been shutdown. This just means another KCL + // worker has stolen our lease. It is expected during autoscaling of instances, and is + // safe to ignore. + Logger[F].warn(s"Skipping checkpointing of shard $shardId because this worker no longer owns the lease") + } +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index ecc3023..25d256c 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -7,334 +7,98 @@ */ package com.snowplowanalytics.snowplow.sources.kinesis -import cats.{Order, Semigroup} -import cats.effect.{Async, Ref, Resource, Sync} -import cats.effect.implicits._ +import cats.effect.{Async, Ref, Sync} import cats.implicits._ +import com.snowplowanalytics.snowplow.sources.SourceAndAck +import com.snowplowanalytics.snowplow.sources.internal.{LowLevelEvents, LowLevelSource} import fs2.{Chunk, Pull, Stream} -import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} import org.typelevel.log4cats.slf4j.Slf4jLogger -import software.amazon.awssdk.awscore.defaultsmode.DefaultsMode -import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient -import software.amazon.kinesis.common.{ConfigsBuilder, InitialPositionInStream, InitialPositionInStreamExtended} -import software.amazon.kinesis.coordinator.{Scheduler, WorkerStateChangeListener} -import software.amazon.kinesis.exceptions.ShutdownException -import software.amazon.kinesis.lifecycle.events.{ - InitializationInput, - LeaseLostInput, - ProcessRecordsInput, - ShardEndedInput, - ShutdownRequestedInput -} -import software.amazon.kinesis.metrics.MetricsLevel -import software.amazon.kinesis.processor.{ - RecordProcessorCheckpointer, - ShardRecordProcessor, - ShardRecordProcessorFactory, - SingleStreamTracker -} -import software.amazon.kinesis.retrieval.fanout.FanOutConfig -import software.amazon.kinesis.retrieval.polling.PollingConfig +import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger} +import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEndedInput} import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber -import java.net.URI -import java.util.Date import java.util.concurrent.{CountDownLatch, SynchronousQueue} -import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ -import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} -import com.snowplowanalytics.snowplow.sources.SourceAndAck - object KinesisSource { private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] = - Ref.ofEffect(Sync[F].realTime).flatMap { livenessRef => - LowLevelSource.toSourceAndAck(lowLevel(config, livenessRef)) - } - - sealed trait Checkpointable { - def extendedSequenceNumber: ExtendedSequenceNumber - } - private case class RecordCheckpointable(extendedSequenceNumber: ExtendedSequenceNumber, checkpointer: RecordProcessorCheckpointer) - extends Checkpointable - private case class ShardEndCheckpointable(checkpointer: RecordProcessorCheckpointer, release: CountDownLatch) extends Checkpointable { - override def extendedSequenceNumber: ExtendedSequenceNumber = ExtendedSequenceNumber.SHARD_END - } - - private type KinesisCheckpointer[F[_]] = Checkpointer[F, Map[String, Checkpointable]] - - private def lowLevel[F[_]: Async]( - config: KinesisSourceConfig, - livenessRef: Ref[F, FiniteDuration] - ): LowLevelSource[F, Map[String, Checkpointable]] = - new LowLevelSource[F, Map[String, Checkpointable]] { - def checkpointer: KinesisCheckpointer[F] = kinesisCheckpointer[F] - - def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = - kinesisStream(config, livenessRef) - - def lastLiveness: F[FiniteDuration] = - livenessRef.get - } - - private implicit def checkpointableOrder: Order[Checkpointable] = Order.from { case (a, b) => - a.extendedSequenceNumber.compareTo(b.extendedSequenceNumber) - } - - private implicit def checkpointableSemigroup: Semigroup[Checkpointable] = new Semigroup[Checkpointable] { - def combine(x: Checkpointable, y: Checkpointable): Checkpointable = - x.max(y) - } - - private def ignoreShutdownExceptions[F[_]: Sync](shardId: String): PartialFunction[Throwable, F[Unit]] = { case _: ShutdownException => - // The ShardRecordProcessor instance has been shutdown. This just means another KCL - // worker has stolen our lease. It is expected during autoscaling of instances, and is - // safe to ignore. - Logger[F].warn(s"Skipping checkpointing of shard $shardId because this worker no longer owns the lease") - } - - private def kinesisCheckpointer[F[_]: Async]: KinesisCheckpointer[F] = new KinesisCheckpointer[F] { - def combine(x: Map[String, Checkpointable], y: Map[String, Checkpointable]): Map[String, Checkpointable] = - x |+| y - - val empty: Map[String, Checkpointable] = Map.empty - - def ack(c: Map[String, Checkpointable]): F[Unit] = - c.toList.parTraverse_ { - case (shardId, RecordCheckpointable(extendedSequenceNumber, checkpointer)) => - Logger[F].debug(s"Checkpointing shard $shardId at $extendedSequenceNumber") *> - Sync[F] - .blocking( - checkpointer.checkpoint(extendedSequenceNumber.sequenceNumber, extendedSequenceNumber.subSequenceNumber) - ) - .recoverWith(ignoreShutdownExceptions(shardId)) - case (shardId, ShardEndCheckpointable(checkpointer, release)) => - Logger[F].debug(s"Checkpointing shard $shardId at SHARD_END") *> - Sync[F].blocking(checkpointer.checkpoint()).recoverWith(ignoreShutdownExceptions(shardId)) *> - Sync[F].delay(release.countDown) + Ref.ofEffect(Sync[F].realTime).flatMap { liveness => + LowLevelSource.toSourceAndAck { + new LowLevelSource[F, Map[String, Checkpointable]] { + def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = + kinesisStream(config, liveness) + + def checkpointer: KinesisCheckpointer[F] = + new KinesisCheckpointer[F]() + + def lastLiveness: F[FiniteDuration] = + liveness.get + } } - - def nack(c: Map[String, Checkpointable]): F[Unit] = - Sync[F].unit - } - - private sealed trait KCLAction - private case class ProcessRecords(shardId: String, processRecordsInput: ProcessRecordsInput) extends KCLAction - private case class ShardEnd( - shardId: String, - await: CountDownLatch, - shardEndedInput: ShardEndedInput - ) extends KCLAction - private case class KCLError(t: Throwable) extends KCLAction + } private def kinesisStream[F[_]: Async]( config: KinesisSourceConfig, - livenessRef: Ref[F, FiniteDuration] - ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = + liveness: Ref[F, FiniteDuration] + ): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = { + val actionQueue = new SynchronousQueue[KCLAction]() for { - kinesisClient <- Stream.resource(mkKinesisClient[F](config.customEndpoint)) - dynamoClient <- Stream.resource(mkDynamoDbClient[F](config.dynamodbCustomEndpoint)) - cloudWatchClient <- Stream.resource(mkCloudWatchClient[F](config.cloudwatchCustomEndpoint)) - queue = new SynchronousQueue[KCLAction] - scheduler <- Stream.eval(scheduler(kinesisClient, dynamoClient, cloudWatchClient, config, queue)) - _ <- Stream.resource(runRecordProcessor[F](scheduler)) - s <- Stream.emit(pullFromQueue(queue, livenessRef).stream).repeat - } yield s + _ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue)) + events <- Stream.emit(pullFromQueue(actionQueue, liveness).stream).repeat + } yield events + } private def pullFromQueue[F[_]: Sync]( queue: SynchronousQueue[KCLAction], - livenessRef: Ref[F, FiniteDuration] + liveness: Ref[F, FiniteDuration] ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = - for { - maybeE <- Pull.eval(Sync[F].delay(Option(queue.poll))) - e <- maybeE match { - case Some(e) => Pull.pure(e) - case None => Pull.eval(Sync[F].interruptible(queue.take)) - } - now <- Pull.eval(Sync[F].realTime) - _ <- Pull.eval(livenessRef.set(now)) - _ <- e match { - case ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty => - pullFromQueue[F](queue, livenessRef) - case ProcessRecords(shardId, processRecordsInput) => - val chunk = Chunk.javaList(processRecordsInput.records()).map(_.data()) - val lastRecord = processRecordsInput.records.asScala.last // last is safe because we handled the empty case above - val checkpointable = RecordCheckpointable( - new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber), - processRecordsInput.checkpointer - ) - val next = - LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(lastRecord.approximateArrivalTimestamp)) - Pull.output1(next).covary[F] *> pullFromQueue[F](queue, livenessRef) - case ShardEnd(shardId, await, shardEndedInput) => - val checkpointable = ShardEndCheckpointable(shardEndedInput.checkpointer, await) - val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None) - Pull - .eval(Logger[F].info(s"Ending this window of events early because reached the end of Kinesis shard $shardId")) - .covaryOutput *> - Pull.output1(last).covary[F] *> Pull.done - case KCLError(t) => - Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t) - } - } yield () - - private def runRecordProcessor[F[_]: Async](scheduler: Scheduler): Resource[F, Unit] = - Sync[F].blocking(scheduler.run()).background *> Resource.onFinalize(Sync[F].blocking(scheduler.shutdown())) - - private def shardRecordProcessor( - queue: SynchronousQueue[KCLAction], - currentShardIds: AtomicReference[Set[String]] - ): ShardRecordProcessor = new ShardRecordProcessor { - private var shardId: String = _ - - def initialize(initializationInput: InitializationInput): Unit = { - shardId = initializationInput.shardId - val oldSet = currentShardIds.getAndUpdate(_ + shardId) - if (oldSet.contains(shardId)) { - // This is a rare edge-case scenario. Three things must all happen to hit this scenario: - // 1. KCL fails to renew a lease due to some transient runtime error - // 2. KCL re-aquires the lost lease for the same shard - // 3. The original ShardRecordProcessor is not terminated until after KCL re-aquires the lease - // This is a very unhealthy state, so we should kill the app. - val action = KCLError(new RuntimeException(s"Refusing to initialize a duplicate record processor for shard $shardId")) - queue.put(action) - } - } - - def shardEnded(shardEndedInput: ShardEndedInput): Unit = { - val countDownLatch = new CountDownLatch(1) - queue.put(ShardEnd(shardId, countDownLatch, shardEndedInput)) - countDownLatch.await() - currentShardIds.updateAndGet(_ - shardId) - () + Pull.eval(resolveNextAction(queue, liveness)).flatMap { + case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty => + pullFromQueue[F](queue, liveness) + case KCLAction.ProcessRecords(shardId, processRecordsInput) => + Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F] *> pullFromQueue[F](queue, liveness) + case KCLAction.ShardEnd(shardId, await, shardEndedInput) => + handleShardEnd[F](shardId, await, shardEndedInput) *> Pull.done + case KCLAction.KCLError(t) => + Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t) } - def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { - val action = ProcessRecords(shardId, processRecordsInput) - queue.put(action) + private def resolveNextAction[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[KCLAction] = { + val nextAction = Sync[F].delay(Option[KCLAction](queue.poll)).flatMap { + case Some(action) => Sync[F].pure(action) + case None => Sync[F].interruptible(queue.take) } - - def leaseLost(leaseLostInput: LeaseLostInput): Unit = { - currentShardIds.updateAndGet(_ - shardId) - () - } - - def shutdownRequested(shutdownRequestedInput: ShutdownRequestedInput): Unit = () + nextAction <* updateLiveness(liveness) } - private def recordProcessorFactory( - queue: SynchronousQueue[KCLAction], - currentShardIds: AtomicReference[Set[String]] - ): ShardRecordProcessorFactory = { () => - shardRecordProcessor(queue, currentShardIds) + private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] = + Sync[F].realTime.flatMap(now => liveness.set(now)) + + private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = { + val chunk = Chunk.javaList(input.records()).map(_.data()) + val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above + val checkpointable = Checkpointable.Record( + new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber), + input.checkpointer + ) + LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(lastRecord.approximateArrivalTimestamp)) } - private def initialPositionOf(config: KinesisSourceConfig.InitialPosition): InitialPositionInStreamExtended = - config match { - case KinesisSourceConfig.InitialPosition.Latest => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST) - case KinesisSourceConfig.InitialPosition.TrimHorizon => - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) - case KinesisSourceConfig.InitialPosition.AtTimestamp(instant) => - InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(instant)) - } - - private def scheduler[F[_]: Sync]( - kinesisClient: KinesisAsyncClient, - dynamoDbClient: DynamoDbAsyncClient, - cloudWatchClient: CloudWatchAsyncClient, - kinesisConfig: KinesisSourceConfig, - queue: SynchronousQueue[KCLAction] - ): F[Scheduler] = - Sync[F].delay { - val configsBuilder = - new ConfigsBuilder( - kinesisConfig.streamName, - kinesisConfig.appName, - kinesisClient, - dynamoDbClient, - cloudWatchClient, - kinesisConfig.workerIdentifier, - recordProcessorFactory(queue, new AtomicReference(Set.empty[String])) - ) - - val retrievalConfig = - configsBuilder.retrievalConfig - .streamTracker(new SingleStreamTracker(kinesisConfig.streamName, initialPositionOf(kinesisConfig.initialPosition))) - .retrievalSpecificConfig { - kinesisConfig.retrievalMode match { - case KinesisSourceConfig.Retrieval.FanOut => - new FanOutConfig(kinesisClient).streamName(kinesisConfig.streamName).applicationName(kinesisConfig.appName) - case KinesisSourceConfig.Retrieval.Polling(maxRecords) => - new PollingConfig(kinesisConfig.streamName, kinesisClient).maxRecords(maxRecords) - } - } - - val leaseManagementConfig = - configsBuilder.leaseManagementConfig - .failoverTimeMillis(kinesisConfig.leaseDuration.toMillis) - - // We ask to see empty batches, so that we can update the health check even when there are no records in the stream - val processorConfig = - configsBuilder.processorConfig - .callProcessRecordsEvenForEmptyRecordList(true) - - val coordinatorConfig = configsBuilder.coordinatorConfig - .workerStateChangeListener(new WorkerStateChangeListener { - def onWorkerStateChange(newState: WorkerStateChangeListener.WorkerState): Unit = () - override def onAllInitializationAttemptsFailed(e: Throwable): Unit = - queue.put(KCLError(e)) - }) - - new Scheduler( - configsBuilder.checkpointConfig, - coordinatorConfig, - leaseManagementConfig, - configsBuilder.lifecycleConfig, - configsBuilder.metricsConfig.metricsLevel(MetricsLevel.NONE), - processorConfig, - retrievalConfig - ) - } - - private def mkKinesisClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, KinesisAsyncClient] = - Resource.fromAutoCloseable { - Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint - val builder = - KinesisAsyncClient - .builder() - .defaultsMode(DefaultsMode.AUTO) - val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) - customized.build - } - } - - private def mkDynamoDbClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, DynamoDbAsyncClient] = - Resource.fromAutoCloseable { - Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint - val builder = - DynamoDbAsyncClient - .builder() - .defaultsMode(DefaultsMode.AUTO) - val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) - customized.build - } - } + private def handleShardEnd[F[_]: Sync]( + shardId: String, + await: CountDownLatch, + shardEndedInput: ShardEndedInput + ) = { + val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await) + val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None) + Pull + .eval(Logger[F].info(s"Ending this window of events early because reached the end of Kinesis shard $shardId")) + .covaryOutput *> + Pull.output1(last).covary[F] + } - private def mkCloudWatchClient[F[_]: Sync](customEndpoint: Option[URI]): Resource[F, CloudWatchAsyncClient] = - Resource.fromAutoCloseable { - Sync[F].blocking { // Blocking because this might dial the EC2 metadata endpoint - val builder = - CloudWatchAsyncClient - .builder() - .defaultsMode(DefaultsMode.AUTO) - val customized = customEndpoint.map(builder.endpointOverride).getOrElse(builder) - customized.build - } - } } diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/ShardRecordProcessor.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/ShardRecordProcessor.scala new file mode 100644 index 0000000..8b67637 --- /dev/null +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/ShardRecordProcessor.scala @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.sources.kinesis + +import software.amazon.kinesis.lifecycle.events.{ + InitializationInput, + LeaseLostInput, + ProcessRecordsInput, + ShardEndedInput, + ShutdownRequestedInput +} +import software.amazon.kinesis.processor.{ShardRecordProcessor => KCLShardProcessor} + +import java.util.concurrent.{CountDownLatch, SynchronousQueue} +import java.util.concurrent.atomic.AtomicReference + +private[kinesis] object ShardRecordProcessor { + + def apply( + queue: SynchronousQueue[KCLAction], + currentShardIds: AtomicReference[Set[String]] + ): KCLShardProcessor = new KCLShardProcessor { + private var shardId: String = _ + + override def initialize(initializationInput: InitializationInput): Unit = { + shardId = initializationInput.shardId + val oldSet = currentShardIds.getAndUpdate(_ + shardId) + if (oldSet.contains(shardId)) { + // This is a rare edge-case scenario. Three things must all happen to hit this scenario: + // 1. KCL fails to renew a lease due to some transient runtime error + // 2. KCL re-aquires the lost lease for the same shard + // 3. The original ShardRecordProcessor is not terminated until after KCL re-aquires the lease + // This is a very unhealthy state, so we should kill the app. + val action = KCLAction.KCLError(new RuntimeException(s"Refusing to initialize a duplicate record processor for shard $shardId")) + queue.put(action) + } + } + + override def shardEnded(shardEndedInput: ShardEndedInput): Unit = { + val countDownLatch = new CountDownLatch(1) + queue.put(KCLAction.ShardEnd(shardId, countDownLatch, shardEndedInput)) + countDownLatch.await() + currentShardIds.updateAndGet(_ - shardId) + () + } + + override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { + val action = KCLAction.ProcessRecords(shardId, processRecordsInput) + queue.put(action) + } + + override def leaseLost(leaseLostInput: LeaseLostInput): Unit = { + currentShardIds.updateAndGet(_ - shardId) + () + } + + override def shutdownRequested(shutdownRequestedInput: ShutdownRequestedInput): Unit = () + + } +}