Skip to content

Commit

Permalink
Kinesis: create enrich asset based on fs2 (close #480)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Sep 2, 2021
1 parent 1408928 commit 42c4153
Show file tree
Hide file tree
Showing 21 changed files with 813 additions and 98 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,25 @@ jobs:
- name: Build and publish Enrich PubSub Docker image
run: sbt "project pubsub" docker:publish

publish_kinesis:
needs: test
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: coursier/cache-action@v6
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Docker login
run: docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD
env:
DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }}
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
- name: Build and publish Enrich Kinesis Docker image
run: sbt "project kinesis" docker:publish

publish_beam:
needs: test
if: startsWith(github.ref, 'refs/tags/')
Expand Down
27 changes: 26 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
lazy val root = project.in(file("."))
.settings(name := "enrich")
.settings(BuildSettings.basicSettings)
.aggregate(common, pubsub, beam, streamCommon, streamKinesis, streamKafka, streamNsq, streamStdin)
.aggregate(common, pubsub, kinesis, beam, streamCommon, streamKinesis, streamKafka, streamNsq, streamStdin)

lazy val common = project
.in(file("modules/common"))
Expand Down Expand Up @@ -268,6 +268,31 @@ lazy val pubsub = project
.settings(BuildSettings.dockerSettings)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin)

lazy val kinesis = project
.in(file("modules/kinesis"))
.dependsOn(commonFs2)
.settings(BuildSettings.basicSettings)
.settings(BuildSettings.formatting)
.settings(BuildSettings.scoverageSettings)
.settings(BuildSettings.sbtAssemblySettings)
.settings(
name := "snowplow-enrich-kinesis",
description := "High-performance app built on top of functional streams that enriches Snowplow events from Kinesis",
buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description),
buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.kinesis.generated",
Docker / packageName := "snowplow/snowplow-enrich-kinesis",
)
.settings(Test / parallelExecution := false)
.settings(
libraryDependencies ++= Seq(
Dependencies.Libraries.fs2Aws,
),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
)
.enablePlugins(BuildInfoPlugin)
.settings(BuildSettings.dockerSettings)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin)

lazy val bench = project
.in(file("modules/bench"))
.dependsOn(pubsub % "test->test")
Expand Down
190 changes: 190 additions & 0 deletions config/config.kinesis.hocon.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
{
# Collector input
"input": {
"type": "Kinesis"

# Name of the application which the KCL daemon should assume
"appName": "enrich-kinesis"

# Name of the Kinesis stream to read from
"streamName": "collector-payloads"

# AWS region in which the Kinesis stream resides.
"region": "eu-central-1"

# Either TRIM_HORIZON or LATEST
"initialPosition": "TRIM_HORIZON"

# Optional, set the polling mode for retrieving records. Default is FanOut
"retrievalMode": "FanOut"
# "retrievalMode": {
# "type": "Polling"
# "maxRecords": 1000
# }

# Optional, configure the checkpointer.
"checkpointSettings": {
# The max number of records to aggregate before checkpointing the records.
# Default is 1000.
"maxBatchSize": 1000

# The max amount of time to wait before checkpointing the records.
# Default is 10 seconds.
"maxBatchWait": 10 seconds
}
}

# Enriched events output
"good": {
"type": "Kinesis"

# Name of the Kinesis stream to write to
"streamName": "enriched"

# AWS region in which the Kinesis stream resides.
"region": "eu-central-1"

# Optional. How the output stream/topic will be partitioned in Kinesis
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
# "partitionKey" = "user_id"

# The delay threshold to use for batching
# Default is 200 milliseconds
"delayThreshold": 200 milliseconds

# Max number of items in the batch to collect before emitting
# Default is 500
"maxBatchSize": 500

# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000000

# Minimum and maximum backoff periods
"backoffPolicy": {
# Default is 100 ms
"minBackoff": 100 milliseconds
# Default is 10 s
"maxBackoff": 10 seconds
}
}

# Pii events output
"pii": {
"type": "Kinesis"

# Name of the Kinesis stream to write to
"streamName": "pii"

# AWS region in which the Kinesis stream resides.
"region": "eu-central-1"

# Optional. How the output stream/topic will be partitioned in Kinesis
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
# "partitionKey" = "user_id"

# The delay threshold to use for batching
# Default is 200 milliseconds
"delayThreshold": 200 milliseconds

# Max number of items in the batch to collect before emitting
# Default is 500
"maxBatchSize": 500

# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000000

# Minimum and maximum backoff periods
"backoffPolicy": {
# Default is 100 ms
"minBackoff": 100 milliseconds
# Default is 10 s
"maxBackoff": 10 seconds
}
}

# Bad rows output
"bad": {
"type": "Kinesis"

# Name of the Kinesis stream to write to
"streamName": "bad"

# AWS region in which the Kinesis stream resides.
"region": "eu-central-1"

# The delay threshold to use for batching
# Default is 200 milliseconds
"delayThreshold": 200 milliseconds

# Max number of items in the batch to collect before emitting
# Default is 500
"maxBatchSize": 500

# Max size of the batch in bytes before emitting
# Default is 5MB
"maxBatchBytes": 5000000

# Minimum and maximum backoff periods
"backoffPolicy": {
# Default is 100 ms
"minBackoff": 100 milliseconds
# Default is 10 s
"maxBackoff": 10 seconds
}
}

# Optional, period after which enrich assets should be checked for updates
# no assets will be updated if the key is absent
"assetsUpdatePeriod": "7 days"

"monitoring": {

# Optional, for tracking runtime exceptions
"sentry": {
"dsn": "http://sentry.acme.com"
}

# Optional, configure how metrics are reported
"metrics": {

# Send metrics to a StatsD server on localhost
"statsd": {

"hostname": "localhost"
"port": 8125

# Required, how frequently to report metrics
"period": "10 seconds"

# Any key-value pairs to be tagged on every StatsD metric
"tags": {
"app": enrich
}

# Optional, override the default metric prefix
# "prefix": "snowplow.enrich."
}

# Log to stdout using Slf4j
"stdout": {
"period": "10 seconds"

# Optional, override the default metric prefix
# "prefix": "snowplow.enrich."
}

# Optional, cloudwatch metrics are enabled by default
"cloudwatch": false
}
}
}
5 changes: 0 additions & 5 deletions config/config.pubsub.hocon.sample
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
{
"auth": {
# "Gcp" is the only valid option now
"type": "Gcp"
}

# Collector input
"input": {
"type": "PubSub"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ object Environment {
goodSink: Resource[F, AttributedByteSink[F]],
piiSink: Option[Resource[F, AttributedByteSink[F]]],
badSink: Resource[F, ByteSink[F]],
checkpointer: Pipe[F, A, Unit],
checkpointer: Resource[F, Pipe[F, A, Unit]],
getPayload: A => Array[Byte],
processor: Processor
): Resource[F, Environment[F, A]] = {
Expand All @@ -152,6 +152,7 @@ object Environment {
case Some(dsn) => Resource.eval[F, Option[SentryClient]](Sync[F].delay(Sentry.init(dsn.toString).some))
case None => Resource.pure[F, Option[SentryClient]](none[SentryClient])
}
chkpt <- checkpointer
_ <- Resource.eval(pauseEnrich.set(false) *> Logger[F].info("Enrich environment initialized"))
} yield Environment[F, A](
client,
Expand All @@ -164,7 +165,7 @@ object Environment {
good,
pii,
bad,
checkpointer,
chkpt,
getPayload,
sentry,
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Authentication, Input, Output}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Input, Monitoring, Output}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Sink, Source}

Expand All @@ -42,11 +42,10 @@ object Run {
version: String,
description: String,
ec: ExecutionContext,
mkSource: (Blocker, Authentication, Input) => Stream[F, A],
mkGoodSink: (Blocker, Authentication, Output) => Resource[F, AttributedByteSink[F]],
mkPiiSink: (Blocker, Authentication, Output) => Resource[F, AttributedByteSink[F]],
mkBadSink: (Blocker, Authentication, Output) => Resource[F, ByteSink[F]],
checkpointer: Pipe[F, A, Unit],
mkSource: (Blocker, Input, Option[Monitoring]) => (Stream[F, A], Resource[F, Pipe[F, A, Unit]]),
mkGoodSink: (Blocker, Output, Option[Monitoring]) => Resource[F, AttributedByteSink[F]],
mkPiiSink: (Blocker, Output, Option[Monitoring]) => Resource[F, AttributedByteSink[F]],
mkBadSink: (Blocker, Output, Option[Monitoring]) => Resource[F, ByteSink[F]],
getPayload: A => Array[Byte],
ordered: Boolean
): F[ExitCode] =
Expand All @@ -61,38 +60,40 @@ object Run {
_ <- Logger[F].info(s"Initialising resources for $name $version")
processor = Processor(name, version)
file = parsed.configFile
goodSink = initAttributedSink(blocker, file.auth, file.good, mkGoodSink)
piiSink = file.pii.map(out => initAttributedSink(blocker, file.auth, out, mkPiiSink))
goodSink = initAttributedSink(blocker, file.good, file.monitoring, mkGoodSink)
piiSink = file.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkPiiSink))
badSink = file.bad match {
case Output.FileSystem(path) =>
Sink.fileSink[F](path, blocker)
case _ =>
mkBadSink(blocker, file.auth, file.bad)
mkBadSink(blocker, file.bad, file.monitoring)
}
exit <-
(file.auth, file.input) match {
case (_, p: Input.FileSystem) =>
file.input match {
case p: Input.FileSystem =>
val (source, checkpointer) = Source.filesystem[F](blocker, p.dir)
val env = Environment
.make[F, Array[Byte]](
blocker,
ec,
parsed,
Source.filesystem[F](blocker, p.dir),
source,
goodSink,
piiSink,
badSink,
_.void,
checkpointer,
identity,
processor
)
runEnvironment[F, Array[Byte]](env, false)
case _ =>
val (source, checkpointer) = mkSource(blocker, file.input, file.monitoring)
val env = Environment
.make[F, A](
blocker,
ec,
parsed,
mkSource(blocker, file.auth, file.input),
source,
goodSink,
piiSink,
badSink,
Expand All @@ -111,15 +112,15 @@ object Run {

private def initAttributedSink[F[_]: Concurrent: ContextShift: Timer](
blocker: Blocker,
auth: Authentication,
output: Output,
mkGoodSink: (Blocker, Authentication, Output) => Resource[F, AttributedByteSink[F]],
monitoring: Option[Monitoring],
mkGoodSink: (Blocker, Output, Option[Monitoring]) => Resource[F, AttributedByteSink[F]],
): Resource[F, AttributedByteSink[F]] =
output match {
case Output.FileSystem(path) =>
Sink.fileSink[F](path, blocker).map(sink => row => sink(row.data))
case _ =>
mkGoodSink(blocker, auth, output)
mkGoodSink(blocker, output, monitoring)
}

private def runEnvironment[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer, A](
Expand Down
Loading

0 comments on commit 42c4153

Please sign in to comment.