Skip to content

Commit

Permalink
Validate enriched event against atomic schema before emitting (close #…
Browse files Browse the repository at this point in the history
  • Loading branch information
voropaevp authored and benjben committed Jan 19, 2022
1 parent 6b0be80 commit 40829d9
Show file tree
Hide file tree
Showing 7 changed files with 801 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit

import cats.effect.{ContextShift, IO, Clock, Blocker}

import io.circe.literal._

import fs2.Stream

import com.snowplowanalytics.iglu.client.Client
Expand All @@ -43,10 +45,41 @@ class EnrichBench {

implicit val ioClock: Clock[IO] = Clock.create[IO]

val client = Client.parseDefault[IO](json"""
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 500,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
},
{
"name": "Iglu Central - GCP Mirror",
"priority": 1,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://mirror01.iglucentral.com"
}
}
}
]
}
}
""").rethrowT.unsafeRunSync()

@Benchmark
def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = {
implicit val CS: ContextShift[IO] = state.contextShift
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), state.blocker, Client.IgluCentral, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), client, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
}

@Benchmark
Expand Down Expand Up @@ -83,19 +116,19 @@ object EnrichBench {
raw = EnrichSpec.payload[IO]

val input = Stream.emits(List(
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring
),
)).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ class EtlPipelineBench {

@Benchmark
def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
val payload = EnrichSpec.collectorPayload
EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync()
}

@Benchmark
def measureProcessEventsId(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
val payload = EnrichSpec.collectorPayload
EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ThriftLoaderBench {

@Benchmark
def measureNormalize(state: ThriftLoaderBench.BenchState) = {
Enrich.encodeEvent(state.event)
Enrich.serializeEnriched(state.event)
}
}

Expand All @@ -42,7 +42,7 @@ object ThriftLoaderBench {

@Setup(Level.Trial)
def setup(): Unit = {
data = EnrichSpec.colllectorPayload.toRaw
data = EnrichSpec.collectorPayload.toRaw

event = new EnrichedEvent()
event.setApp_id("foo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,25 @@ package enrichments
import java.nio.charset.Charset
import java.net.URI
import java.time.Instant

import java.io.{PrintWriter, StringWriter}
import org.joda.time.DateTime

import io.circe.Json

import cats.Monad
import cats.data.{EitherT, NonEmptyList, OptionT, ValidatedNel}
import cats.effect.Clock
import cats.implicits._

import com.snowplowanalytics.refererparser._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor}

import com.snowplowanalytics.refererparser._
import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure

import adapters.RawEvent
import enrichments.{EventEnrichments => EE}
Expand All @@ -50,6 +49,8 @@ import utils.{IgluUtils, ConversionUtils => CU}

object EnrichmentManager {

val atomicSchema: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0))

/**
* Run the enrichment workflow
* @param registry Contain configuration for all enrichments to apply
Expand Down Expand Up @@ -92,6 +93,7 @@ object EnrichmentManager {
enriched.pii = pii.asString
}
}
_ <- validateEnriched(enriched, raw, processor, client)
} yield enriched

/**
Expand Down Expand Up @@ -746,4 +748,55 @@ object EnrichmentManager {
Failure.EnrichmentFailures(Instant.now(), fs),
Payload.EnrichmentPayload(pee, re)
)

private def validateEnriched[F[_]: Clock: Monad: RegistryLookup](
enriched: EnrichedEvent,
raw: RawEvent,
processor: Processor,
client: Client[F, Json]
): EitherT[F, BadRow, Unit] =
enriched.toAtomicEvent
.leftMap(err =>
EnrichmentManager.buildEnrichmentFailuresBadRow(
NonEmptyList(
EnrichmentFailure(
None,
FailureDetails.EnrichmentFailureMessage.Simple(
"Error during conversion of enriched event to the atomic format"
)
),
List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.Simple(s"${cleanStackTrace(err)}")))
),
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
)
.toEitherT[F]
.flatMap(atomic =>
client
.check(SelfDescribingData(atomicSchema, atomic))
.leftMap(err =>
EnrichmentManager.buildEnrichmentFailuresBadRow(
NonEmptyList(
EnrichmentFailure(
None,
FailureDetails.EnrichmentFailureMessage.Simple(
s"Enriched event not valid against ${atomicSchema.toSchemaUri}"
)
),
List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.IgluError(atomicSchema, err)))
),
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
)
)

private def cleanStackTrace(t: Throwable): String = {
val sw = new StringWriter
t.printStackTrace(new PrintWriter(sw))
sw.toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ package com.snowplowanalytics.snowplow.enrich.common.outputs
import java.lang.{Integer => JInteger}
import java.lang.{Float => JFloat}
import java.lang.{Byte => JByte}
import java.time.format.DateTimeFormatter

import scala.beans.BeanProperty

import cats.implicits._

import io.circe.Json

import com.snowplowanalytics.snowplow.badrows.Payload.PartiallyEnrichedEvent

/**
Expand Down Expand Up @@ -246,9 +251,166 @@ class EnrichedEvent extends Serializable {

// Fields modified in PII enrichemnt (JSON String)
@BeanProperty var pii: String = _

private val JsonSchemaDateTimeFormat =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")

private def toKv[T](
k: String,
v: T,
f: T => Json
): Option[(String, Json)] =
Option(v).map(value => (k, f(value)))

private def toKv(k: String, s: String): Option[(String, Json)] = toKv(k, s, Json.fromString)
private def toKv(k: String, i: JInteger): Option[(String, Json)] = toKv(k, i, (jInt: JInteger) => Json.fromInt(jInt))
private def toKv(k: String, f: JFloat): Option[(String, Json)] = toKv(k, f, (jFloat: JFloat) => Json.fromFloatOrNull(jFloat))
private def toKv(k: String, b: JByte): Option[(String, Json)] = toKv(k, b, (jByte: JByte) => Json.fromBoolean(jByte != 0))
private def toDateKv(k: String, s: String): Option[(String, Json)] =
toKv(
k,
s,
(s: String) => Json.fromString(DateTimeFormatter.ISO_DATE_TIME.format(JsonSchemaDateTimeFormat.parse(s)))
)

def toAtomicEvent: Either[Throwable, Json] =
Either.catchNonFatal(
Json.fromFields(
toKv("app_id", this.app_id) ++
toKv("platform", this.platform) ++
toDateKv("etl_tstamp", this.etl_tstamp) ++
toDateKv("collector_tstamp", this.collector_tstamp) ++
toDateKv("dvce_created_tstamp", this.dvce_created_tstamp) ++
toKv("event", this.event) ++
toKv("event_id", this.event_id) ++
toKv("txn_id", this.txn_id) ++
toKv("name_tracker", this.name_tracker) ++
toKv("v_tracker", this.v_tracker) ++
toKv("v_collector", this.v_collector) ++
toKv("v_etl", this.v_etl) ++
toKv("user_id", this.user_id) ++
toKv("user_ipaddress", this.user_ipaddress) ++
toKv("user_fingerprint", this.user_fingerprint) ++
toKv("domain_userid", this.domain_userid) ++
toKv("domain_sessionidx", this.domain_sessionidx) ++
toKv("network_userid", this.network_userid) ++
toKv("geo_country", this.geo_country) ++
toKv("geo_region", this.geo_region) ++
toKv("geo_city", this.geo_city) ++
toKv("geo_zipcode", this.geo_zipcode) ++
toKv("geo_latitude", this.geo_latitude) ++
toKv("geo_longitude", this.geo_longitude) ++
toKv("geo_region_name", this.geo_region_name) ++
toKv("ip_isp", this.ip_isp) ++
toKv("ip_organization", this.ip_organization) ++
toKv("ip_domain", this.ip_domain) ++
toKv("ip_netspeed", this.ip_netspeed) ++
toKv("page_url", this.page_url) ++
toKv("page_title", this.page_title) ++
toKv("page_referrer", this.page_referrer) ++
toKv("page_urlscheme", this.page_urlscheme) ++
toKv("page_urlhost", this.page_urlhost) ++
toKv("page_urlport", this.page_urlport) ++
toKv("page_urlpath", this.page_urlpath) ++
toKv("page_urlquery", this.page_urlquery) ++
toKv("page_urlfragment", this.page_urlfragment) ++
toKv("refr_urlscheme", this.refr_urlscheme) ++
toKv("refr_urlhost", this.refr_urlhost) ++
toKv("refr_urlport", this.refr_urlport) ++
toKv("refr_urlpath", this.refr_urlpath) ++
toKv("refr_urlquery", this.refr_urlquery) ++
toKv("refr_urlfragment", this.refr_urlfragment) ++
toKv("refr_medium", this.refr_medium) ++
toKv("refr_source", this.refr_source) ++
toKv("refr_term", this.refr_term) ++
toKv("mkt_medium", this.mkt_medium) ++
toKv("mkt_source", this.mkt_source) ++
toKv("mkt_term", this.mkt_term) ++
toKv("mkt_content", this.mkt_content) ++
toKv("mkt_campaign", this.mkt_campaign) ++
toKv("se_category", this.se_category) ++
toKv("se_action", this.se_action) ++
toKv("se_label", this.se_label) ++
toKv("se_property", this.se_property) ++
toKv("se_value", this.se_value) ++
toKv("tr_orderid", this.tr_orderid) ++
toKv("tr_affiliation", this.tr_affiliation) ++
toKv("tr_total", this.tr_total) ++
toKv("tr_tax", this.tr_tax) ++
toKv("tr_shipping", this.tr_shipping) ++
toKv("tr_city", this.tr_city) ++
toKv("tr_state", this.tr_state) ++
toKv("tr_country", this.tr_country) ++
toKv("ti_orderid", this.ti_orderid) ++
toKv("ti_sku", this.ti_sku) ++
toKv("ti_name", this.ti_name) ++
toKv("ti_category", this.ti_category) ++
toKv("ti_price", this.ti_price) ++
toKv("ti_quantity", this.ti_quantity) ++
toKv("pp_xoffset_min", this.pp_xoffset_min) ++
toKv("pp_xoffset_max", this.pp_xoffset_max) ++
toKv("pp_yoffset_min", this.pp_yoffset_min) ++
toKv("pp_yoffset_max", this.pp_yoffset_max) ++
toKv("useragent", this.useragent) ++
toKv("br_name", this.br_name) ++
toKv("br_family", this.br_family) ++
toKv("br_version", this.br_version) ++
toKv("br_type", this.br_type) ++
toKv("br_renderengine", this.br_renderengine) ++
toKv("br_lang", this.br_lang) ++
toKv("br_features_pdf", this.br_features_pdf) ++
toKv("br_features_flash", this.br_features_flash) ++
toKv("br_features_java", this.br_features_java) ++
toKv("br_features_director", this.br_features_director) ++
toKv("br_features_quicktime", this.br_features_quicktime) ++
toKv("br_features_realplayer", this.br_features_realplayer) ++
toKv("br_features_windowsmedia", this.br_features_windowsmedia) ++
toKv("br_features_gears", this.br_features_gears) ++
toKv("br_features_silverlight", this.br_features_silverlight) ++
toKv("br_cookies", this.br_cookies) ++
toKv("br_colordepth", this.br_colordepth) ++
toKv("br_viewwidth", this.br_viewwidth) ++
toKv("br_viewheight", this.br_viewheight) ++
toKv("os_name", this.os_name) ++
toKv("os_family", this.os_family) ++
toKv("os_manufacturer", this.os_manufacturer) ++
toKv("os_timezone", this.os_timezone) ++
toKv("dvce_type", this.dvce_type) ++
toKv("dvce_ismobile", this.dvce_ismobile) ++
toKv("dvce_screenwidth", this.dvce_screenwidth) ++
toKv("dvce_screenheight", this.dvce_screenheight) ++
toKv("doc_charset", this.doc_charset) ++
toKv("doc_width", this.doc_width) ++
toKv("doc_height", this.doc_height) ++
toKv("tr_currency", this.tr_currency) ++
toKv("tr_total_base", this.tr_total_base) ++
toKv("tr_tax_base", this.tr_tax_base) ++
toKv("tr_shipping_base", this.tr_shipping_base) ++
toKv("ti_currency", this.ti_currency) ++
toKv("ti_price_base", this.ti_price_base) ++
toKv("base_currency", this.base_currency) ++
toKv("geo_timezone", this.geo_timezone) ++
toKv("mkt_clickid", this.mkt_clickid) ++
toKv("mkt_network", this.mkt_network) ++
toKv("etl_tags", this.etl_tags) ++
toDateKv("dvce_sent_tstamp", this.dvce_sent_tstamp) ++
toKv("refr_domain_userid", this.refr_domain_userid) ++
toDateKv("refr_dvce_tstamp", this.refr_dvce_tstamp) ++
toKv("domain_sessionid", this.domain_sessionid) ++
toDateKv("derived_tstamp", this.derived_tstamp) ++
toKv("event_vendor", this.event_vendor) ++
toKv("event_name", this.event_name) ++
toKv("event_format", this.event_format) ++
toKv("event_version", this.event_version) ++
toKv("event_fingerprint", this.event_fingerprint) ++
toDateKv("true_tstamp", this.true_tstamp)
)
)

}

object EnrichedEvent {

def toPartiallyEnrichedEvent(enrichedEvent: EnrichedEvent): PartiallyEnrichedEvent =
PartiallyEnrichedEvent(
app_id = Option(enrichedEvent.app_id),
Expand Down
Loading

0 comments on commit 40829d9

Please sign in to comment.