From fe5bbf21134842212113ec808d62e14200b09261 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 20 Dec 2024 12:22:31 +0000 Subject: [PATCH] Kinesis source improve management of shard ends In the Kinesis Source, we terminate the inner stream of events whenever we reach the end of a Kinesis shard. Terminating the inner stream is important, because it forces the application to fully process and checkpoint any outstanding events, and this unblocks our KCL record processor from checkpointing the end of the shard. Before this PR, we terminated the inner stream for _every_ shard end. But terminating the stream is quite inefficient, and during a re-sharding we probably reach many shard ends at similar time. This PR changes things so we try to handle many shard ends at the same time. During re-sharding, this should reduce the number of times we need to terminate the inner stream. --- .../sources/kinesis/KinesisSource.scala | 68 ++++++++++++++----- 1 file changed, 51 insertions(+), 17 deletions(-) diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala index 4ef3487..5994368 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/sources/kinesis/KinesisSource.scala @@ -8,6 +8,7 @@ package com.snowplowanalytics.snowplow.sources.kinesis import cats.effect.{Async, Ref, Sync} +import cats.data.NonEmptyList import cats.implicits._ import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sources.internal.{LowLevelEvents, LowLevelSource} @@ -48,25 +49,54 @@ object KinesisSource { val actionQueue = new SynchronousQueue[KCLAction]() for { _ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue)) - events <- Stream.emit(pullFromQueue(actionQueue, liveness).stream).repeat + events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat } yield events } - private def pullFromQueue[F[_]: Sync]( + private def pullFromQueueAndEmit[F[_]: Sync]( queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration] ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = - Pull.eval(resolveNextAction(queue, liveness)).flatMap { - case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty => - pullFromQueue[F](queue, liveness) - case KCLAction.ProcessRecords(shardId, processRecordsInput) => - Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F] *> pullFromQueue[F](queue, liveness) - case KCLAction.ShardEnd(shardId, await, shardEndedInput) => - handleShardEnd[F](shardId, await, shardEndedInput) *> Pull.done - case KCLAction.KCLError(t) => - Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t) + Pull.eval(pullFromQueue(queue, liveness)).flatMap { case PullFromQueueResult(actions, hasShardEnd) => + val toEmit = actions.traverse { + case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty => + Pull.done + case KCLAction.ProcessRecords(shardId, processRecordsInput) => + Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F] + case KCLAction.ShardEnd(shardId, await, shardEndedInput) => + handleShardEnd[F](shardId, await, shardEndedInput) + case KCLAction.KCLError(t) => + Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t) + } + if (hasShardEnd) { + val log = Logger[F].info { + actions + .collect { case KCLAction.ShardEnd(shardId, _, _) => + shardId + } + .mkString("Ending this window of events early because reached the end of Kinesis shards: ", ",", "") + } + Pull.eval(log).covaryOutput *> toEmit *> Pull.done + } else + toEmit *> pullFromQueueAndEmit(queue, liveness) + } + + private case class PullFromQueueResult(actions: NonEmptyList[KCLAction], hasShardEnd: Boolean) + + private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[PullFromQueueResult] = + resolveNextAction(queue, liveness).flatMap { + case shardEnd: KCLAction.ShardEnd => + // If we reached the end of one shard, it is likely we reached the end of other shards too. + // Therefore pull more actions from the queue, to minimize the number of times we need to do + // an early close of the window. + resolveAllActions(queue).map { more => + PullFromQueueResult(NonEmptyList(shardEnd, more), hasShardEnd = true) + } + case other => + PullFromQueueResult(NonEmptyList.one(other), hasShardEnd = false).pure[F] } + /** Always returns a `KCLAction`, possibly waiting until one is available */ private def resolveNextAction[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[KCLAction] = { val nextAction = Sync[F].delay(Option[KCLAction](queue.poll)).flatMap { case Some(action) => Sync[F].pure(action) @@ -75,6 +105,13 @@ object KinesisSource { nextAction <* updateLiveness(liveness) } + /** Returns immediately, but the `List[KCLAction]` might be empty */ + private def resolveAllActions[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[List[KCLAction]] = + for { + ret <- Sync[F].delay(new java.util.ArrayList[KCLAction]()) + _ <- Sync[F].delay(queue.drainTo(ret)) + } yield ret.asScala.toList + private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] = Sync[F].realTime.flatMap(now => liveness.set(now)) @@ -89,17 +126,14 @@ object KinesisSource { LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp)) } - private def handleShardEnd[F[_]: Sync]( + private def handleShardEnd[F[_]]( shardId: String, await: CountDownLatch, shardEndedInput: ShardEndedInput - ) = { + ): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = { val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await) val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None) - Pull - .eval(Logger[F].info(s"Ending this window of events early because reached the end of Kinesis shard $shardId")) - .covaryOutput *> - Pull.output1(last).covary[F] + Pull.output1(last) } }