Skip to content

Commit

Permalink
enrich-kinesis: add integration test (close #531)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jan 27, 2022
1 parent 3a7f43a commit d2148a0
Show file tree
Hide file tree
Showing 8 changed files with 492 additions and 0 deletions.
24 changes: 24 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,30 @@ lazy val kinesis = project
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin)
.settings(excludeDependencies ++= Dependencies.Libraries.exclusions)

lazy val kinesisIntegrationTests = project
.in(file("modules/kinesis-integration-tests"))
.dependsOn(commonFs2)
.settings(BuildSettings.basicSettings)
.settings(BuildSettings.sbtAssemblySettings)
.settings(
name := "snowplow-enrich-kinesis-integration-tests",
description := "App that generates collector payloads, sends them to Kinesis, and check the output of enrich-kinesis",
buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description),
buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.kinesis.test.generated",
)
.settings(Test / parallelExecution := false)
.settings(
libraryDependencies ++= Seq(
Dependencies.Libraries.fs2Aws,
Dependencies.Libraries.scalacheck
),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
)
.enablePlugins(BuildInfoPlugin)
.settings(BuildSettings.dockerSettings)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging)
.settings(excludeDependencies ++= Dependencies.Libraries.exclusions)

lazy val bench = project
.in(file("modules/bench"))
.dependsOn(pubsub % "test->test")
Expand Down
15 changes: 15 additions & 0 deletions modules/kinesis/src/it/resources/hackathon.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"input": {
"streamName": "hackathon-collector-payloads"
}

"output": {
"good": {
"streamName": "hackathon-enriched"
}
"bad": {
"streamName": "hackathon-bad"
}
}
}

19 changes: 19 additions & 0 deletions modules/kinesis/src/it/resources/iglu_resolver.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-3",
"data": {
"cacheSize": 500,
"cacheTtl": 600,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
}
]
}
}
35 changes: 35 additions & 0 deletions modules/kinesis/src/it/resources/integrationTests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash

set -x

export PYTHONWARNINGS="ignore:Unverified HTTPS request"

region=eu-central-1
collectorPayloadsStream=hackathon-collector-payloads
enrichedStream=hackathon-enriched
badRowsStream=hackathon-bad

aws --region $region kinesis create-stream --stream-name $collectorPayloadsStream --shard-count 1
aws --region $region kinesis create-stream --stream-name $enrichedStream --shard-count 1
aws --region $region kinesis create-stream --stream-name $badRowsStream --shard-count 1

sbt "project kinesis" assembly
target="modules/kinesis/target/scala-2.12/"
JAR=$target$(ls -1rth $target | tail -1)
java -jar $JAR --config /home/ben/code/enrich/test/hackathon.hocon --iglu-config /home/ben/code/enrich/test/iglu_resolver.json --enrichments /home/ben/code/enrich/test/enrichments/ > enrich-kinesis.log 2>&1 &
pid=$!

sbt "project kinesisIntegrationTests" assembly
target="modules/kinesis-integration-tests/target/scala-2.12/"
JAR=$target$(ls -1rth $target | tail -1)
java -jar $JAR $region $collectorPayloadsStream $enrichedStream $badRowsStream
res=$?

kill $pid
aws --region $region kinesis delete-stream --stream-name $collectorPayloadsStream --enforce-consumer-deletion
aws --region $region kinesis delete-stream --stream-name $enrichedStream --enforce-consumer-deletion
aws --region $region kinesis delete-stream --stream-name $badRowsStream --enforce-consumer-deletion
aws --region $region dynamodb delete-table --table-name enrich-kinesis-integration-tests
aws --region $region dynamodb delete-table --table-name snowplow-enrich-kinesis

exit $res
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.kinesis.test

import fs2.Stream

import cats.implicits._

import cats.effect.Sync

import io.circe.syntax._
import io.circe.{Encoder, Json, JsonObject}

import org.scalacheck.{Arbitrary, Gen}

import org.joda.time.DateTime

import org.apache.thrift.TSerializer

import java.util.Base64

import com.snowplowanalytics.iglu.core.{ SelfDescribingData, SchemaKey, SchemaVer }
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._

import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload

object CollectorPayloadGen {

private val serializer = new TSerializer()
private val base64Encoder = Base64.getEncoder()

def generate[F[_]: Sync](nbGoodEvents: Long, nbBadRows: Long): Stream[F, Array[Byte]] =
generateRaw(nbGoodEvents, nbBadRows).map(_.toThrift).map(serializer.serialize)

def generateRaw[F[_]: Sync](nbGoodEvents: Long, nbBadRows: Long): Stream[F, CollectorPayload] =
Stream.repeatEval(runGen(collectorPayloadGen(true))).take(nbGoodEvents) ++ Stream.repeatEval(runGen(collectorPayloadGen(false))).take(nbBadRows)

private def collectorPayloadGen(valid: Boolean): Gen[CollectorPayload] =
for {
vendor <- Gen.const("com.snowplowanalytics.snowplow")
version <- Gen.const("tp2")
api = CollectorPayload.Api(vendor, version)

queryString = Nil

contentType = Some("application/json")

body <- bodyGen(valid).map(Some(_))

name = "scala-tracker_1.0.0"
encoding = "UTF8"
hostname = Some("example.acme")
source = CollectorPayload.Source(name, encoding, hostname)

timestamp <- Gen.option(DateTime.now)
ipAddress <- Gen.option(ipAddressGen)
useragent <- Gen.option(userAgentGen)
refererUri = None
headers = Nil
userId <- Gen.uuid.map(Some(_))
context = CollectorPayload.Context(timestamp, ipAddress, useragent, refererUri, headers, userId)
} yield CollectorPayload(api, queryString, contentType, body, source, context)

private def bodyGen(valid: Boolean): Gen[String] =
for {
p <- Gen.oneOf("web", "mob", "app").withKey("p")
aid <- Gen.const("enrich-kinesis-integration-tests").withKey("aid")
e <- Gen.const("ue").withKey("e")
tv <- Gen.oneOf("scala-tracker_1.0.0", "js_2.0.0", "go_1.2.3").withKey("tv")
uePx <-
if(valid)
ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px")
else
ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "payload_data", "jsonschema", SchemaVer.Full(1,0,4)),
List(asObject(List(p, aid, e, uePx, tv))).asJson
).asJson.toString

private def ueGen =
for {
sdj <- Gen.oneOf(changeFormGen, clientSessionGen)
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1,0,0)),
sdj.asJson
).asJson


private def changeFormGen =
for {
formId <- strGen(32, Gen.alphaNumChar).withKey("formId")
elementId <- strGen(32, Gen.alphaNumChar).withKey("elementId")
nodeName <- Gen.oneOf(List("INPUT", "TEXTAREA", "SELECT")).withKey("nodeName")
`type` <- Gen.option(Gen.oneOf(List("button", "checkbox", "color", "date", "datetime", "datetime-local", "email", "file", "hidden", "image", "month", "number", "password", "radio", "range", "reset", "search", "submit", "tel", "text", "time", "url", "week"))).withKeyOpt("type")
value <- Gen.option(strGen(16, Gen.alphaNumChar)).withKeyNull("value")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "change_form", "jsonschema", SchemaVer.Full(1,0,0)),
asObject(List(formId, elementId, nodeName, `type`, value))
)

private def clientSessionGen =
for {
userId <- Gen.uuid.withKey("userId")
sessionId <- Gen.uuid.withKey("sessionId")
sessionIndex <- Gen.choose(0, 2147483647).withKey("sessionIndex")
previousSessionId <- Gen.option(Gen.uuid).withKeyNull("previousSessionId")
storageMechanism <- Gen.oneOf(List("SQLITE", "COOKIE_1", "COOKIE_3", "LOCAL_STORAGE", "FLASH_LSO")).withKey("storageMechanism")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "client_session", "jsonschema", SchemaVer.Full(1,0,1)),
asObject(List(userId, sessionId, sessionIndex, previousSessionId, storageMechanism))
)

private def strGen(n: Int, gen: Gen[Char]): Gen[String] =
Gen.chooseNum(1, n).flatMap(len => Gen.listOfN(len, gen).map(_.mkString))

private def ipAddressGen = Gen.oneOf(ipv4AddressGen, ipv6AddressGen)

private def ipv4AddressGen =
for {
a <- Gen.chooseNum(0, 255)
b <- Gen.chooseNum(0, 255)
c <- Gen.chooseNum(0, 255)
d <- Gen.chooseNum(0, 255)
} yield s"$a.$b.$c.$d"

private def ipv6AddressGen =
for {
a <- Arbitrary.arbitrary[Short]
b <- Arbitrary.arbitrary[Short]
c <- Arbitrary.arbitrary[Short]
d <- Arbitrary.arbitrary[Short]
e <- Arbitrary.arbitrary[Short]
f <- Arbitrary.arbitrary[Short]
g <- Arbitrary.arbitrary[Short]
h <- Arbitrary.arbitrary[Short]
} yield f"$a%x:$b%x:$c%x:$d%x:$e%x:$f%x:$g%x:$h%x"

private def userAgentGen: Gen[String] =
Gen.oneOf(
"Mozilla/5.0 (iPad; CPU OS 6_1_3 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10B329 Safari/8536.25",
"Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1",
"Mozilla/5.0 (Linux; U; Android 2.2; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.112 Safari/535.1",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.0; Trident/5.0)",
"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
)

// Helpers to control null/absence

private def asObject(fields: List[Option[(String, Json)]]): Json =
JsonObject.fromIterable(fields.collect { case Some(field) => field }).asJson

implicit class GenOps[A](gen: Gen[A]) {
def withKey[B](name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] =
gen.map { a => Some((name -> a.asJson)) }
}

implicit class GenOptOps[A](gen: Gen[Option[A]]) {
def withKeyOpt(name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] =
gen.map {
case Some(a) => Some((name -> a.asJson))
case None => None
}

def withKeyNull(name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] =
gen.map {
case Some(a) => Some((name -> a.asJson))
case None => Some((name -> Json.Null))
}
}

/** Convert `Gen` into `IO` */
def runGen[F[_]: Sync, A](gen: Gen[A]): F[A] = {
val MAX_ATTEMPTS = 5
def go(attempt: Int): F[A] =
if (attempt >= MAX_ATTEMPTS)
Sync[F].raiseError(new RuntimeException(s"Couldn't generate an event after $MAX_ATTEMPTS attempts"))
else
Sync[F].delay(gen.sample).flatMap {
case Some(a) => Sync[F].pure(a)
case None => go(attempt + 1)
}
go(1)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.kinesis.test

import java.nio.ByteBuffer
import java.util.UUID

import scala.concurrent.duration._

import cats.implicits._

import cats.effect.{Async, Blocker, Timer}

import fs2.Pipe

import fs2.aws.internal.{KinesisProducerClient, KinesisProducerClientImpl}

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import retry.syntax.all._
import retry.RetryPolicies._

import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}

import com.amazonaws.services.kinesis.producer.{KinesisProducerConfiguration, UserRecordResult}

object KinesisSink {

private implicit def unsafeLogger[F[_]: Async]: Logger[F] =
Slf4jLogger.getLogger[F]

def init[F[_]: Async: Timer](
blocker: Blocker,
region: String,
stream: String
): Pipe[F, Array[Byte], Unit] = {
val producerConfig = new KinesisProducerConfiguration()
.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED)
.setRegion(region)
.setMetricsLevel("none")

val producer = new KinesisProducerClientImpl[F](Some(producerConfig))

_.evalMap(bytes => writeToKinesis(blocker, producer, stream, bytes))
}

private def writeToKinesis[F[_]: Async: Timer](
blocker: Blocker,
producer: KinesisProducerClient[F],
stream: String,
data: Array[Byte]
): F[Unit] = {
val retryPolicy = capDelay[F](10.second, exponentialBackoff[F](100.millisecond))
val partitionKey = UUID.randomUUID().toString

val res = for {
byteBuffer <- Async[F].delay(ByteBuffer.wrap(data))
cb <- producer.putData(stream, partitionKey, byteBuffer)
cbRes <- registerCallback(blocker, cb)
} yield cbRes

res
.retryingOnFailuresAndAllErrors(
wasSuccessful = _.isSuccessful,
policy = retryPolicy,
onFailure = (result, retryDetails) =>
Logger[F].warn(s"Writing to shard ${result.getShardId()} failed after ${retryDetails.retriesSoFar} retry"),
onError = (exception, retryDetails) =>
Logger[F]
.error(s"Writing to Kinesis errored after ${retryDetails.retriesSoFar} retry. Error: ${exception.toString}") >>
Async[F].raiseError(exception)
)
.void
}

private def registerCallback[F[_]: Async](blocker: Blocker, f: ListenableFuture[UserRecordResult]): F[UserRecordResult] =
Async[F].async[UserRecordResult] { cb =>
Futures.addCallback(
f,
new FutureCallback[UserRecordResult] {
override def onFailure(t: Throwable): Unit = cb(Left(t))
override def onSuccess(result: UserRecordResult): Unit = cb(Right(result))
},
(command: Runnable) => blocker.blockingContext.execute(command)
)
}
}
Loading

0 comments on commit d2148a0

Please sign in to comment.