Skip to content

Commit

Permalink
Kinesis source improve management of shard ends
Browse files Browse the repository at this point in the history
In the Kinesis Source, we terminate the inner stream of events
whenever we reach the end of a Kinesis shard. Terminating the inner
stream is important, because it forces the application to fully process
and checkpoint any outstanding events, and this unblocks our KCL record
processor from checkpointing the end of the shard.

Before this PR, we terminated the inner stream for _every_ shard end. But
terminating the stream is quite inefficient, and during a re-sharding we
probably reach many shard ends at similar time. This PR changes things
so we try to handle many shard ends at the same time. During
re-sharding, this should reduce the number of times we need to terminate
the inner stream.
  • Loading branch information
istreeter committed Dec 20, 2024
1 parent 86f0316 commit fe5bbf2
Showing 1 changed file with 51 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.{Async, Ref, Sync}
import cats.data.NonEmptyList
import cats.implicits._
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{LowLevelEvents, LowLevelSource}
Expand Down Expand Up @@ -48,25 +49,54 @@ object KinesisSource {
val actionQueue = new SynchronousQueue[KCLAction]()
for {
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue))
events <- Stream.emit(pullFromQueue(actionQueue, liveness).stream).repeat
events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat
} yield events
}

private def pullFromQueue[F[_]: Sync](
private def pullFromQueueAndEmit[F[_]: Sync](
queue: SynchronousQueue[KCLAction],
liveness: Ref[F, FiniteDuration]
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] =
Pull.eval(resolveNextAction(queue, liveness)).flatMap {
case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
pullFromQueue[F](queue, liveness)
case KCLAction.ProcessRecords(shardId, processRecordsInput) =>
Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F] *> pullFromQueue[F](queue, liveness)
case KCLAction.ShardEnd(shardId, await, shardEndedInput) =>
handleShardEnd[F](shardId, await, shardEndedInput) *> Pull.done
case KCLAction.KCLError(t) =>
Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t)
Pull.eval(pullFromQueue(queue, liveness)).flatMap { case PullFromQueueResult(actions, hasShardEnd) =>
val toEmit = actions.traverse {
case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
Pull.done
case KCLAction.ProcessRecords(shardId, processRecordsInput) =>
Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F]
case KCLAction.ShardEnd(shardId, await, shardEndedInput) =>
handleShardEnd[F](shardId, await, shardEndedInput)
case KCLAction.KCLError(t) =>
Pull.eval(Logger[F].error(t)("Exception from Kinesis source")) *> Pull.raiseError[F](t)
}
if (hasShardEnd) {
val log = Logger[F].info {
actions
.collect { case KCLAction.ShardEnd(shardId, _, _) =>
shardId
}
.mkString("Ending this window of events early because reached the end of Kinesis shards: ", ",", "")
}
Pull.eval(log).covaryOutput *> toEmit *> Pull.done
} else
toEmit *> pullFromQueueAndEmit(queue, liveness)
}

private case class PullFromQueueResult(actions: NonEmptyList[KCLAction], hasShardEnd: Boolean)

private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[PullFromQueueResult] =
resolveNextAction(queue, liveness).flatMap {
case shardEnd: KCLAction.ShardEnd =>
// If we reached the end of one shard, it is likely we reached the end of other shards too.
// Therefore pull more actions from the queue, to minimize the number of times we need to do
// an early close of the window.
resolveAllActions(queue).map { more =>
PullFromQueueResult(NonEmptyList(shardEnd, more), hasShardEnd = true)
}
case other =>
PullFromQueueResult(NonEmptyList.one(other), hasShardEnd = false).pure[F]
}

/** Always returns a `KCLAction`, possibly waiting until one is available */
private def resolveNextAction[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[KCLAction] = {
val nextAction = Sync[F].delay(Option[KCLAction](queue.poll)).flatMap {
case Some(action) => Sync[F].pure(action)
Expand All @@ -75,6 +105,13 @@ object KinesisSource {
nextAction <* updateLiveness(liveness)
}

/** Returns immediately, but the `List[KCLAction]` might be empty */
private def resolveAllActions[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[List[KCLAction]] =
for {
ret <- Sync[F].delay(new java.util.ArrayList[KCLAction]())
_ <- Sync[F].delay(queue.drainTo(ret))
} yield ret.asScala.toList

private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] =
Sync[F].realTime.flatMap(now => liveness.set(now))

Expand All @@ -89,17 +126,14 @@ object KinesisSource {
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp))
}

private def handleShardEnd[F[_]: Sync](
private def handleShardEnd[F[_]](
shardId: String,
await: CountDownLatch,
shardEndedInput: ShardEndedInput
) = {
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = {
val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await)
val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None)
Pull
.eval(Logger[F].info(s"Ending this window of events early because reached the end of Kinesis shard $shardId"))
.covaryOutput *>
Pull.output1(last).covary[F]
Pull.output1(last)
}

}

0 comments on commit fe5bbf2

Please sign in to comment.