Skip to content

Commit

Permalink
Re-implement Kinesis source without fs2-kinesis
Browse files Browse the repository at this point in the history
The common-streams Kinesis source suffers from a problem where we don't
quite achieve at-least-once processing semantics near the end of a
shard. The problem was in the 3rd-party fs-kinesis library, and it is
not easy to fix with any small code change to that library.

Sorry I cannot provide a link here to back that up -- it is documented
internally at Snowplow.

This PR re-implements our Kinesis source from scratch, this time without
a dependency on fs2-kinesis.  The biggest difference is the way we block
the `shardEnded` method of the KCL record processor, until all records
from the shard have been written to the destination.
  • Loading branch information
istreeter committed Sep 9, 2024
1 parent 65b7725 commit a6b96d0
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import cats.effect.{IO, Ref}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

import eu.timepit.refined.types.numeric.PosInt

import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
Expand Down Expand Up @@ -94,7 +92,6 @@ object Utils {
UUID.randomUUID.toString,
KinesisSourceConfig.InitialPosition.TrimHorizon,
KinesisSourceConfig.Retrieval.Polling(1),
PosInt.unsafeFrom(1),
Some(endpoint),
Some(endpoint),
Some(endpoint),
Expand Down
1 change: 0 additions & 1 deletion modules/kinesis/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ snowplow.defaults: {
type: "Polling"
maxRecords: 1000
}
bufferSize: 1
leaseDuration: "10 seconds"
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import eu.timepit.refined.types.all.PosInt

import io.circe._
import io.circe.config.syntax._
import io.circe.generic.extras.semiauto.deriveConfiguredDecoder
Expand All @@ -24,7 +22,6 @@ case class KinesisSourceConfig(
workerIdentifier: String,
initialPosition: KinesisSourceConfig.InitialPosition,
retrievalMode: KinesisSourceConfig.Retrieval,
bufferSize: PosInt,
customEndpoint: Option[URI],
dynamodbCustomEndpoint: Option[URI],
cloudwatchCustomEndpoint: Option[URI],
Expand All @@ -33,8 +30,6 @@ case class KinesisSourceConfig(

object KinesisSourceConfig {

private implicit val posIntDecoder: Decoder[PosInt] = Decoder.decodeInt.emap(PosInt.from)

sealed trait InitialPosition

object InitialPosition {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package com.snowplowanalytics.snowplow.sources.kinesis

import io.circe.literal._
import com.typesafe.config.ConfigFactory
import eu.timepit.refined.types.all.PosInt
import io.circe.config.syntax.CirceConfigOps
import io.circe.Decoder
import io.circe.generic.semiauto._
Expand Down Expand Up @@ -40,7 +39,6 @@ class KinesisSourceConfigSpec extends Specification {
"initialPosition": {
"type": "TrimHorizon"
},
"bufferSize": 42,
"leaseDuration": "20 seconds"
}
"""
Expand All @@ -52,7 +50,6 @@ class KinesisSourceConfigSpec extends Specification {
c.workerIdentifier must beEqualTo("my-identifier"),
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.bufferSize.value must beEqualTo(42),
c.leaseDuration must beEqualTo(20.seconds)
).reduce(_ and _)
}
Expand All @@ -71,7 +68,6 @@ class KinesisSourceConfigSpec extends Specification {
"initialPosition": {
"type": "TRIM_HORIZON"
},
"bufferSize": 42,
"leaseDuration": "20 seconds"
}
"""
Expand All @@ -83,7 +79,6 @@ class KinesisSourceConfigSpec extends Specification {
c.workerIdentifier must beEqualTo("my-identifier"),
c.initialPosition must beEqualTo(KinesisSourceConfig.InitialPosition.TrimHorizon),
c.retrievalMode must beEqualTo(KinesisSourceConfig.Retrieval.Polling(42)),
c.bufferSize.value must beEqualTo(42),
c.leaseDuration must beEqualTo(20.seconds)
).reduce(_ and _)
}
Expand All @@ -108,7 +103,6 @@ class KinesisSourceConfigSpec extends Specification {
workerIdentifier = System.getenv("HOSTNAME"),
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
bufferSize = PosInt.unsafeFrom(1),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,18 @@ private[sources] object LowLevelSource {
*/
private def timedWindows[F[_]: Async, A](config: EventProcessingConfig.TimedWindows): Pipe[F, A, Stream[F, A]] = {
def go(timedPull: Pull.Timed[F, A], current: Option[Queue[F, Option[A]]]): Pull[F, Stream[F, A], Unit] =
timedPull.uncons.flatMap {
case None =>
timedPull.uncons.attempt.flatMap {
case Right(None) =>
current match {
case None => Pull.done
case Some(q) => Pull.eval(q.offer(None)) >> Pull.done
}
case Some((Left(_), next)) =>
case Right(Some((Left(_), next))) =>
current match {
case None => go(next, None)
case Some(q) => Pull.eval(q.offer(None)) >> go(next, None)
}
case Some((Right(chunk), next)) =>
case Right(Some((Right(chunk), next))) =>
current match {
case None =>
val pull = for {
Expand All @@ -272,6 +272,11 @@ private[sources] object LowLevelSource {
case Some(q) =>
Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q))
}
case Left(throwable) =>
current match {
case None => Pull.raiseError[F](throwable)
case Some(q) => Pull.eval(q.offer(None)) >> Pull.raiseError[F](throwable)
}
}

in =>
Expand Down
20 changes: 6 additions & 14 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ object Dependencies {
val betterMonadicFor = "0.3.1"
val kindProjector = "0.13.2"
val collectionCompat = "2.11.0"
val refined = "0.11.1"

// Streams
val fs2Kafka = "3.4.0"
val pubsub = "1.127.3"
val fs2AwsKinesis = "4.1.0"
val awsSdk2 = "2.25.16"
val kinesisClient = "2.5.7"

Expand Down Expand Up @@ -67,18 +65,14 @@ object Dependencies {
val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor
val kindProjector = "org.typelevel" %% "kind-projector" % V.kindProjector cross CrossVersion.full
val collectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % V.collectionCompat
val refined = "eu.timepit" %% "refined" % V.refined

// streams
val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val fs2AwsKinesis = ("io.laserdisc" %% "fs2-aws-kinesis" % V.fs2AwsKinesis)
.exclude("software.amazon.kinesis", "amazon-kinesis-client")
.exclude("com.amazonaws", "amazon-kinesis-producer")
val arnsSdk2 = "software.amazon.awssdk" % "arns" % V.awsSdk2
val kinesisSdk2 = "software.amazon.awssdk" % "kinesis" % V.awsSdk2
val dynamoDbSdk2 = "software.amazon.awssdk" % "dynamodb" % V.awsSdk2
val cloudwatchSdk2 = "software.amazon.awssdk" % "cloudwatch" % V.awsSdk2
val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val arnsSdk2 = "software.amazon.awssdk" % "arns" % V.awsSdk2
val kinesisSdk2 = "software.amazon.awssdk" % "kinesis" % V.awsSdk2
val dynamoDbSdk2 = "software.amazon.awssdk" % "dynamodb" % V.awsSdk2
val cloudwatchSdk2 = "software.amazon.awssdk" % "cloudwatch" % V.awsSdk2
val kinesisClient = ("software.amazon.kinesis" % "amazon-kinesis-client" % V.kinesisClient)
.exclude("com.amazonaws", "amazon-kinesis-producer")
.exclude("software.amazon.glue", "schema-registry-build-tools")
Expand Down Expand Up @@ -124,12 +118,10 @@ object Dependencies {

val kinesisDependencies = Seq(
kinesisClient,
fs2AwsKinesis,
arnsSdk2,
kinesisSdk2,
dynamoDbSdk2,
cloudwatchSdk2,
refined,
circeConfig,
circeGeneric,
circeGenericExtra,
Expand Down

0 comments on commit a6b96d0

Please sign in to comment.