Skip to content

Commit

Permalink
Extend coverage of Transform.transformEventUnstructured unit tests (c…
Browse files Browse the repository at this point in the history
…lose #50)
  • Loading branch information
spenes committed Jan 3, 2024
1 parent 76ee3de commit 2aeb575
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@
package com.snowplowanalytics.snowplow.loaders.transform

import org.specs2.Specification
import io.circe.Json
import io.circe._
import io.circe.literal._
import io.circe.syntax._

import com.snowplowanalytics.iglu.schemaddl.parquet.Caster.NamedValue
import com.snowplowanalytics.iglu.schemaddl.parquet.{Caster, Type}
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData}
import com.snowplowanalytics.snowplow.badrows.{Processor => BadRowProcessor}
import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, SnowplowEvent}
import com.snowplowanalytics.snowplow.eventgen.runGen
import com.snowplowanalytics.snowplow.eventgen.enrich.{SdkEvent => GenSdkEvent}
import com.snowplowanalytics.snowplow.eventgen.protocol.event._
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent}

import java.util.UUID
import java.time.Instant
import java.time.{Instant, LocalDate}
import scala.util.Random

class TransformSpec extends Specification {
import TransformSpec._
Expand All @@ -26,10 +33,13 @@ class TransformSpec extends Specification {
Transform.transformEventUnstructured should
Transform a minimal valid event $e1
Transform a valid event with each different type of atomic field $e2
Create a failed event if a currency field cannot be cast to a decimal $e3
Create a failed event if a currency field cannot be cast to a decimal due to rounding $e3_1
Create a failed event if a currency field cannot be cast to a decimal due to high precision $e3_2
Create additional columns for unstruct events $e4
Create additional columns for contexts, using different columns for different schemas $e5
Create additional columns for contexts, using same column when schemas have same major version $e6
Transform values of event fields correctly $e7
Transform types of event fields correctly $e8
"""

def e1 = {
Expand Down Expand Up @@ -84,7 +94,7 @@ class TransformSpec extends Specification {
}
}

def e3 = {
def e3_1 = {
val event = Event
.minimal(testEventId, testTimestamp, "0.0.0", "0.0.0")
.copy(
Expand All @@ -96,6 +106,18 @@ class TransformSpec extends Specification {
result must beLeft
}

def e3_2 = {
val event = Event
.minimal(testEventId, testTimestamp, "0.0.0", "0.0.0")
.copy(
tr_total = Some(12345678987654321.34) // Too high precision
)

val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event, schemasToSkip)

result must beLeft
}

def e4 = {
val data = json"""
{
Expand Down Expand Up @@ -232,6 +254,20 @@ class TransformSpec extends Specification {
}
}

def e7 =
forall(genEvents(100)) { event =>
val eventMap = getParams(event)
val result = Transform.transformEventUnstructured(badProcessor, TestCaster, TestCirceFolder, event, schemasToSkip).toOption.get
forall(result)(c => c.value must beEqualTo(getFieldValueFromEvent(eventMap, c.name)))
}

def e8 =
forall(genEvents(100)) { event =>
val eventMap = getParams(event)
val result = Transform.transformEventUnstructured(badProcessor, typeCaster, typeCirceFolder, event, schemasToSkip).toOption.get
forall(result)(c => c.value must beEqualTo(getFieldTypeFromEvent(eventMap, c.name)))
}

}

object TransformSpec {
Expand All @@ -245,4 +281,99 @@ object TransformSpec {
val testSchemaKey701 = SchemaKey.fromUri("iglu:com.example/mySchema/jsonschema/7-0-1").toOption.get
val testSchemaKey702 = SchemaKey.fromUri("iglu:com.example/mySchema/jsonschema/7-0-2").toOption.get
val testSchemaKey801 = SchemaKey.fromUri("iglu:com.example/mySchema/jsonschema/8-0-1").toOption.get

val typeCaster = new Caster[String] {
override def nullValue: String = "null"
override def jsonValue(v: Json): String = "json"
override def stringValue(v: String): String = "string"
override def booleanValue(v: Boolean): String = "boolean"
override def intValue(v: Int): String = "int"
override def longValue(v: Long): String = "long"
override def doubleValue(v: Double): String = "double"
override def decimalValue(unscaled: BigInt, details: Type.Decimal): String = "double"
override def timestampValue(v: Instant): String = "timestamp"
override def dateValue(v: LocalDate): String = "date"
override def arrayValue(vs: List[String]): String = "array"
override def structValue(vs: List[Caster.NamedValue[String]]): String = "struct"
}

val typeCirceFolder = new Json.Folder[String] {
override def onNull: String = "null"
override def onBoolean(value: Boolean): String = "boolean"
override def onNumber(value: JsonNumber): String = "number"
override def onString(value: String): String = "string"
override def onArray(value: Vector[Json]): String = "array"
override def onObject(value: JsonObject): String = "object"
}

def genEvents(n: Int): List[Event] =
(1 to n).toList.flatMap(_ =>
runGen(
GenSdkEvent.gen(
eventPerPayloadMin = 1,
eventPerPayloadMax = 1,
now = Instant.now,
frequencies = EventFrequencies(1, 1, 1, 1, 1, 1, UnstructEventFrequencies(1, 1, 1)),
generateEnrichments = true
),
new Random()
)
)

def getParams(event: Event): Map[String, Any] = {
val values = event.productIterator.filter {
case _: Contexts | _: UnstructEvent => false
case _ => true
}
// We can't use 'Product.productElementNames' method to get field names
// because that method is introduced in Scala 2.13. Since we want to run
// tests with Scala 2.12 as well, we can't use that method.
(AtomicFields.static.map(_.name) zip values.toList).toMap +
("contexts" -> event.contexts) +
("unstruct_event" -> event.unstruct_event)
}

def getFieldValueFromEvent(eventMap: Map[String, Any], fieldName: String): Json =
if (fieldName.startsWith("contexts_"))
eventMap("contexts").asInstanceOf[Contexts].toShreddedJson(fieldName)
else if (fieldName.startsWith("unstruct_"))
eventMap("unstruct_event").asInstanceOf[UnstructEvent].toShreddedJson.get._2
else
eventMap(fieldName) match {
case Some(a) => anyToJson(a)
case None => Json.Null
case a => anyToJson(a)
}

def getFieldTypeFromEvent(eventMap: Map[String, Any], fieldName: String): String =
if (fieldName.startsWith("contexts_")) "array"
else if (fieldName.startsWith("unstruct_")) "object"
else
eventMap(fieldName) match {
case Some(a) => anyToType(a)
case None => "null"
case a => anyToType(a)
}

def anyToJson(value: Any): Json =
value match {
case v: String => v.asJson
case v: Int => v.asJson
case v: Double => v.asJson
case v: Instant => v.asJson
case v: UUID => v.asJson
case v: Boolean => v.asJson
case _ => throw new Exception(s"Value with unexpected type: $value")
}

def anyToType(value: Any): String =
value match {
case _: String => "string"
case _: Int => "int"
case _: Double => "double"
case _: Instant => "timestamp"
case _: UUID => "string"
case _: Boolean => "boolean"
case _ => throw new Exception(s"Value with unexpected type: $value")
}
}
4 changes: 3 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ object BuildSettings {
ThisBuild / autoAPIMappings := true,
ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix
ThisBuild / dynverSeparator := "-", // to be compatible with docker

resolvers ++= Seq(
("Snowplow Analytics Maven repo" at "http://maven.snplow.com/releases/").withAllowInsecureProtocol(true)
),
Compile / resourceGenerators += Def.task {
val license = (Compile / resourceManaged).value / "META-INF" / "LICENSE"
IO.copyFile(file("LICENSE.md"), license)
Expand Down
11 changes: 7 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object Dependencies {
val specs2 = "4.20.0"
val catsEffectSpecs2 = "1.5.0"
val localstack = "1.19.0"
val eventGen = "0.7.0"
}

val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % V.catsEffect
Expand Down Expand Up @@ -105,9 +106,10 @@ object Dependencies {
val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk

// tests
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
val catsEffectTestkit = "org.typelevel" %% "cats-effect-testkit" % V.catsEffect % Test
val catsEffectSpecs2 = "org.typelevel" %% "cats-effect-testing-specs2" % V.catsEffectSpecs2 % Test
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
val catsEffectTestkit = "org.typelevel" %% "cats-effect-testkit" % V.catsEffect % Test
val catsEffectSpecs2 = "org.typelevel" %% "cats-effect-testing-specs2" % V.catsEffectSpecs2 % Test
val eventGen = "com.snowplowanalytics" %% "snowplow-event-generator-core" % V.eventGen % Test

val streamsDependencies = Seq(
cats,
Expand Down Expand Up @@ -189,6 +191,7 @@ object Dependencies {
analyticsSdk,
specs2,
catsEffectSpecs2,
slf4jSimple % Test
slf4jSimple % Test,
eventGen
)
}

0 comments on commit 2aeb575

Please sign in to comment.