Skip to content

Commit

Permalink
Validate enriched event against atomic schema before emitting (close #…
Browse files Browse the repository at this point in the history
  • Loading branch information
voropaevp authored and benjben committed Jan 25, 2022
1 parent b07c0d0 commit 871d400
Show file tree
Hide file tree
Showing 62 changed files with 5,499 additions and 484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit

import cats.effect.{ContextShift, IO, Clock, Blocker}

import io.circe.literal._

import fs2.Stream

import com.snowplowanalytics.iglu.client.Client
Expand All @@ -43,10 +45,41 @@ class EnrichBench {

implicit val ioClock: Clock[IO] = Clock.create[IO]

val client = Client.parseDefault[IO](json"""
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 500,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
},
{
"name": "Iglu Central - GCP Mirror",
"priority": 1,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://mirror01.iglucentral.com"
}
}
}
]
}
}
""").rethrowT.unsafeRunSync()

@Benchmark
def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = {
implicit val CS: ContextShift[IO] = state.contextShift
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), state.blocker, Client.IgluCentral, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), client, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
}

@Benchmark
Expand Down Expand Up @@ -83,19 +116,19 @@ object EnrichBench {
raw = EnrichSpec.payload[IO]

val input = Stream.emits(List(
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring
),
)).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ class EtlPipelineBench {

@Benchmark
def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
val payload = EnrichSpec.collectorPayload
EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync()
}

@Benchmark
def measureProcessEventsId(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
val payload = EnrichSpec.collectorPayload
EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ThriftLoaderBench {

@Benchmark
def measureNormalize(state: ThriftLoaderBench.BenchState) = {
Enrich.encodeEvent(state.event)
Enrich.serializeEnriched(state.event)
}
}

Expand All @@ -42,7 +42,7 @@ object ThriftLoaderBench {

@Setup(Level.Trial)
def setup(): Unit = {
data = EnrichSpec.colllectorPayload.toRaw
data = EnrichSpec.collectorPayload.toRaw

event = new EnrichedEvent()
event.setApp_id("foo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object Enrich {
): BadRow.GenericError = {
val base64 = new String(Base64.getEncoder.encode(row))
val rawPayload = BadRowPayload.RawPayload(base64)
val failure = Failure.GenericFailure(time, NonEmptyList.one(error.toString))
val failure = Failure.GenericFailure(time, NonEmptyList.one(ConversionUtils.cleanStackTrace(error)))
BadRow.GenericError(processor, failure, rawPayload)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "com.acme",
"name": "output",
"format": "jsonschema",
"version": "1-0-0"
},
"properties": {
"output": {
"type": "string"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "com.acme",
"name": "test",
"format": "jsonschema",
"version": "1-0-1"
},
"properties": {
"path": {
"properties": {
"id": {
"type": "integer"
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for a CallRail call completion event",
"self": {
"vendor": "com.callrail",
"name": "call_complete",
"format": "jsonschema",
"version": "1-0-2"
},

"type": "object",
"properties": {
"answered": {
"type": ["boolean", "null"]
},
"customer_city": {
"type": ["string", "null"]
},
"customer_country": {
"type": ["string", "null"]
},
"customer_name": {
"type": ["string", "null"]
},
"customer_phone_number": {
"type": ["string", "null"]
},
"customer_state": {
"type": ["string", "null"]
},
"customer_zip": {
"type": ["string", "null"]
},
"callercity": {
"type": ["string", "null"]
},
"callercountry": {
"type": ["string", "null"]
},
"callername": {
"type": ["string", "null"]
},
"callernum": {
"type": ["string", "null"]
},
"callerstate": {
"type": ["string", "null"]
},
"callerzip": {
"type": ["string", "null"]
},
"callsource": {
"type": ["string", "null"]
},
"datetime": {
"type": "string",
"format": "date-time"
},
"destinationnum": {
"type": ["string", "null"]
},
"duration": {
"type": ["number", "null"]
},
"first_call": {
"type": ["boolean", "null"]
},
"device_type": {
"type": ["string", "null"]
},
"ga": {
"type": ["string", "null"]
},
"gclid": {
"type": ["string", "null"]
},
"id": {
"type": "string"
},
"ip": {
"type": ["string", "null"]
},
"keywords": {
"type": ["string", "null"]
},
"kissmetrics_id": {
"type": ["string", "null"]
},
"landingpage": {
"type": ["string", "null"]
},
"recording": {
"type": ["string", "null"]
},
"referrer": {
"type": ["string", "null"]
},
"referrermedium": {
"type": ["string", "null"]
},
"trackingnum": {
"type": ["string", "null"]
},
"transcription": {
"type": ["string", "null"]
},
"utm_campaign": {
"type": ["string", "null"]
},
"utm_content": {
"type": ["string", "null"]
},
"utm_medium": {
"type": ["string", "null"]
},
"utm_source": {
"type": ["string", "null"]
},
"utm_term": {
"type": ["string", "null"]
},
"utma": {
"type": ["string", "null"]
},
"utmb": {
"type": ["string", "null"]
},
"utmc": {
"type": ["string", "null"]
},
"utmv": {
"type": ["string", "null"]
},
"utmx": {
"type": ["string", "null"]
},
"utmz": {
"type": ["string", "null"]
}
},
"required": ["datetime", "id"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for a Google Analytics hit entity",
"self": {
"vendor": "com.google.analytics.measurement-protocol",
"name": "hit",
"format": "jsonschema",
"version": "1-0-0"
},

"type": "object",
"properties": {
"type": {
"enum": [
"event",
"exception",
"item",
"pageview",
"screenview",
"social",
"timing",
"transaction"
]
},
"nonInteractionHit": {
"type": ["boolean", "null"]
}
},
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for a Google Analytics pageview hit",
"self": {
"vendor": "com.google.analytics.measurement-protocol",
"name": "page_view",
"format": "jsonschema",
"version": "1-0-0"
},

"type": "object",
"properties": {
"documentLocationUrl": {
"type": ["string", "null"],
"maxLength": 2048
},
"documentHostName": {
"type": ["string", "null"],
"maxLength": 100
},
"documentPath": {
"type": ["string", "null"],
"maxLength": 2048
},
"documentTitle": {
"type": ["string", "null"],
"maxLength": 1500
}
},
"additionalProperties": false
}
Loading

0 comments on commit 871d400

Please sign in to comment.