diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala index dcd6d9d28..ffe6715ce 100644 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala @@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit import cats.effect.{ContextShift, IO, Clock, Blocker} +import io.circe.literal._ + import fs2.Stream import com.snowplowanalytics.iglu.client.Client @@ -43,10 +45,41 @@ class EnrichBench { implicit val ioClock: Clock[IO] = Clock.create[IO] + val client = Client.parseDefault[IO](json""" + { + "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1", + "data": { + "cacheSize": 500, + "repositories": [ + { + "name": "Iglu Central", + "priority": 0, + "vendorPrefixes": [ "com.snowplowanalytics" ], + "connection": { + "http": { + "uri": "http://iglucentral.com" + } + } + }, + { + "name": "Iglu Central - GCP Mirror", + "priority": 1, + "vendorPrefixes": [ "com.snowplowanalytics" ], + "connection": { + "http": { + "uri": "http://mirror01.iglucentral.com" + } + } + } + ] + } + } + """).rethrowT.unsafeRunSync() + @Benchmark def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = { implicit val CS: ContextShift[IO] = state.contextShift - Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), state.blocker, Client.IgluCentral, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync() + Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), client, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync() } @Benchmark @@ -83,19 +116,19 @@ object EnrichBench { raw = EnrichSpec.payload[IO] val input = Stream.emits(List( - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring ), - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring ), - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring ), - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring ), - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring ), )).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO] diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala index b7c2d0b96..89a0e34b0 100644 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala @@ -49,13 +49,13 @@ class EtlPipelineBench { @Benchmark def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = { - val payload = EnrichSpec.colllectorPayload + val payload = EnrichSpec.collectorPayload EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync() } @Benchmark def measureProcessEventsId(state: EtlPipelineBench.BenchState) = { - val payload = EnrichSpec.colllectorPayload + val payload = EnrichSpec.collectorPayload EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))) } } diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala index b1b9e388f..0eea9f9f4 100644 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala @@ -30,7 +30,7 @@ class ThriftLoaderBench { @Benchmark def measureNormalize(state: ThriftLoaderBench.BenchState) = { - Enrich.encodeEvent(state.event) + Enrich.serializeEnriched(state.event) } } @@ -42,7 +42,7 @@ object ThriftLoaderBench { @Setup(Level.Trial) def setup(): Unit = { - data = EnrichSpec.colllectorPayload.toRaw + data = EnrichSpec.collectorPayload.toRaw event = new EnrichedEvent() event.setApp_id("foo") diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 482b031b3..c86c59668 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -16,26 +16,25 @@ package enrichments import java.nio.charset.Charset import java.net.URI import java.time.Instant - +import java.io.{PrintWriter, StringWriter} import org.joda.time.DateTime - import io.circe.Json - import cats.Monad import cats.data.{EitherT, NonEmptyList, OptionT, ValidatedNel} import cats.effect.Clock import cats.implicits._ +import com.snowplowanalytics.refererparser._ + import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor} - -import com.snowplowanalytics.refererparser._ +import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure import adapters.RawEvent import enrichments.{EventEnrichments => EE} @@ -50,6 +49,8 @@ import utils.{IgluUtils, ConversionUtils => CU} object EnrichmentManager { + val atomicSchema: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0)) + /** * Run the enrichment workflow * @param registry Contain configuration for all enrichments to apply @@ -92,6 +93,7 @@ object EnrichmentManager { enriched.pii = pii.asString } } + _ <- validateEnriched(enriched, raw, processor, client) } yield enriched /** @@ -746,4 +748,55 @@ object EnrichmentManager { Failure.EnrichmentFailures(Instant.now(), fs), Payload.EnrichmentPayload(pee, re) ) + + private def validateEnriched[F[_]: Clock: Monad: RegistryLookup]( + enriched: EnrichedEvent, + raw: RawEvent, + processor: Processor, + client: Client[F, Json] + ): EitherT[F, BadRow, Unit] = + enriched.toAtomicEvent + .leftMap(err => + EnrichmentManager.buildEnrichmentFailuresBadRow( + NonEmptyList( + EnrichmentFailure( + None, + FailureDetails.EnrichmentFailureMessage.Simple( + "Error during conversion of enriched event to the atomic format" + ) + ), + List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.Simple(s"${cleanStackTrace(err)}"))) + ), + EnrichedEvent.toPartiallyEnrichedEvent(enriched), + RawEvent.toRawEvent(raw), + processor + ) + ) + .toEitherT[F] + .flatMap(atomic => + client + .check(SelfDescribingData(atomicSchema, atomic)) + .leftMap(err => + EnrichmentManager.buildEnrichmentFailuresBadRow( + NonEmptyList( + EnrichmentFailure( + None, + FailureDetails.EnrichmentFailureMessage.Simple( + s"Enriched event not valid against ${atomicSchema.toSchemaUri}" + ) + ), + List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.IgluError(atomicSchema, err))) + ), + EnrichedEvent.toPartiallyEnrichedEvent(enriched), + RawEvent.toRawEvent(raw), + processor + ) + ) + ) + + private def cleanStackTrace(t: Throwable): String = { + val sw = new StringWriter + t.printStackTrace(new PrintWriter(sw)) + sw.toString + } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala index 543a837ba..d99775cd6 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala @@ -15,9 +15,14 @@ package com.snowplowanalytics.snowplow.enrich.common.outputs import java.lang.{Integer => JInteger} import java.lang.{Float => JFloat} import java.lang.{Byte => JByte} +import java.time.format.DateTimeFormatter import scala.beans.BeanProperty +import cats.implicits._ + +import io.circe.Json + import com.snowplowanalytics.snowplow.badrows.Payload.PartiallyEnrichedEvent /** @@ -246,9 +251,166 @@ class EnrichedEvent extends Serializable { // Fields modified in PII enrichemnt (JSON String) @BeanProperty var pii: String = _ + + private val JsonSchemaDateTimeFormat = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + + private def toKv[T]( + k: String, + v: T, + f: T => Json + ): Option[(String, Json)] = + Option(v).map(value => (k, f(value))) + + private def toKv(k: String, s: String): Option[(String, Json)] = toKv(k, s, Json.fromString) + private def toKv(k: String, i: JInteger): Option[(String, Json)] = toKv(k, i, (jInt: JInteger) => Json.fromInt(jInt)) + private def toKv(k: String, f: JFloat): Option[(String, Json)] = toKv(k, f, (jFloat: JFloat) => Json.fromFloatOrNull(jFloat)) + private def toKv(k: String, b: JByte): Option[(String, Json)] = toKv(k, b, (jByte: JByte) => Json.fromBoolean(jByte != 0)) + private def toDateKv(k: String, s: String): Option[(String, Json)] = + toKv( + k, + s, + (s: String) => Json.fromString(DateTimeFormatter.ISO_DATE_TIME.format(JsonSchemaDateTimeFormat.parse(s))) + ) + + def toAtomicEvent: Either[Throwable, Json] = + Either.catchNonFatal( + Json.fromFields( + toKv("app_id", this.app_id) ++ + toKv("platform", this.platform) ++ + toDateKv("etl_tstamp", this.etl_tstamp) ++ + toDateKv("collector_tstamp", this.collector_tstamp) ++ + toDateKv("dvce_created_tstamp", this.dvce_created_tstamp) ++ + toKv("event", this.event) ++ + toKv("event_id", this.event_id) ++ + toKv("txn_id", this.txn_id) ++ + toKv("name_tracker", this.name_tracker) ++ + toKv("v_tracker", this.v_tracker) ++ + toKv("v_collector", this.v_collector) ++ + toKv("v_etl", this.v_etl) ++ + toKv("user_id", this.user_id) ++ + toKv("user_ipaddress", this.user_ipaddress) ++ + toKv("user_fingerprint", this.user_fingerprint) ++ + toKv("domain_userid", this.domain_userid) ++ + toKv("domain_sessionidx", this.domain_sessionidx) ++ + toKv("network_userid", this.network_userid) ++ + toKv("geo_country", this.geo_country) ++ + toKv("geo_region", this.geo_region) ++ + toKv("geo_city", this.geo_city) ++ + toKv("geo_zipcode", this.geo_zipcode) ++ + toKv("geo_latitude", this.geo_latitude) ++ + toKv("geo_longitude", this.geo_longitude) ++ + toKv("geo_region_name", this.geo_region_name) ++ + toKv("ip_isp", this.ip_isp) ++ + toKv("ip_organization", this.ip_organization) ++ + toKv("ip_domain", this.ip_domain) ++ + toKv("ip_netspeed", this.ip_netspeed) ++ + toKv("page_url", this.page_url) ++ + toKv("page_title", this.page_title) ++ + toKv("page_referrer", this.page_referrer) ++ + toKv("page_urlscheme", this.page_urlscheme) ++ + toKv("page_urlhost", this.page_urlhost) ++ + toKv("page_urlport", this.page_urlport) ++ + toKv("page_urlpath", this.page_urlpath) ++ + toKv("page_urlquery", this.page_urlquery) ++ + toKv("page_urlfragment", this.page_urlfragment) ++ + toKv("refr_urlscheme", this.refr_urlscheme) ++ + toKv("refr_urlhost", this.refr_urlhost) ++ + toKv("refr_urlport", this.refr_urlport) ++ + toKv("refr_urlpath", this.refr_urlpath) ++ + toKv("refr_urlquery", this.refr_urlquery) ++ + toKv("refr_urlfragment", this.refr_urlfragment) ++ + toKv("refr_medium", this.refr_medium) ++ + toKv("refr_source", this.refr_source) ++ + toKv("refr_term", this.refr_term) ++ + toKv("mkt_medium", this.mkt_medium) ++ + toKv("mkt_source", this.mkt_source) ++ + toKv("mkt_term", this.mkt_term) ++ + toKv("mkt_content", this.mkt_content) ++ + toKv("mkt_campaign", this.mkt_campaign) ++ + toKv("se_category", this.se_category) ++ + toKv("se_action", this.se_action) ++ + toKv("se_label", this.se_label) ++ + toKv("se_property", this.se_property) ++ + toKv("se_value", this.se_value) ++ + toKv("tr_orderid", this.tr_orderid) ++ + toKv("tr_affiliation", this.tr_affiliation) ++ + toKv("tr_total", this.tr_total) ++ + toKv("tr_tax", this.tr_tax) ++ + toKv("tr_shipping", this.tr_shipping) ++ + toKv("tr_city", this.tr_city) ++ + toKv("tr_state", this.tr_state) ++ + toKv("tr_country", this.tr_country) ++ + toKv("ti_orderid", this.ti_orderid) ++ + toKv("ti_sku", this.ti_sku) ++ + toKv("ti_name", this.ti_name) ++ + toKv("ti_category", this.ti_category) ++ + toKv("ti_price", this.ti_price) ++ + toKv("ti_quantity", this.ti_quantity) ++ + toKv("pp_xoffset_min", this.pp_xoffset_min) ++ + toKv("pp_xoffset_max", this.pp_xoffset_max) ++ + toKv("pp_yoffset_min", this.pp_yoffset_min) ++ + toKv("pp_yoffset_max", this.pp_yoffset_max) ++ + toKv("useragent", this.useragent) ++ + toKv("br_name", this.br_name) ++ + toKv("br_family", this.br_family) ++ + toKv("br_version", this.br_version) ++ + toKv("br_type", this.br_type) ++ + toKv("br_renderengine", this.br_renderengine) ++ + toKv("br_lang", this.br_lang) ++ + toKv("br_features_pdf", this.br_features_pdf) ++ + toKv("br_features_flash", this.br_features_flash) ++ + toKv("br_features_java", this.br_features_java) ++ + toKv("br_features_director", this.br_features_director) ++ + toKv("br_features_quicktime", this.br_features_quicktime) ++ + toKv("br_features_realplayer", this.br_features_realplayer) ++ + toKv("br_features_windowsmedia", this.br_features_windowsmedia) ++ + toKv("br_features_gears", this.br_features_gears) ++ + toKv("br_features_silverlight", this.br_features_silverlight) ++ + toKv("br_cookies", this.br_cookies) ++ + toKv("br_colordepth", this.br_colordepth) ++ + toKv("br_viewwidth", this.br_viewwidth) ++ + toKv("br_viewheight", this.br_viewheight) ++ + toKv("os_name", this.os_name) ++ + toKv("os_family", this.os_family) ++ + toKv("os_manufacturer", this.os_manufacturer) ++ + toKv("os_timezone", this.os_timezone) ++ + toKv("dvce_type", this.dvce_type) ++ + toKv("dvce_ismobile", this.dvce_ismobile) ++ + toKv("dvce_screenwidth", this.dvce_screenwidth) ++ + toKv("dvce_screenheight", this.dvce_screenheight) ++ + toKv("doc_charset", this.doc_charset) ++ + toKv("doc_width", this.doc_width) ++ + toKv("doc_height", this.doc_height) ++ + toKv("tr_currency", this.tr_currency) ++ + toKv("tr_total_base", this.tr_total_base) ++ + toKv("tr_tax_base", this.tr_tax_base) ++ + toKv("tr_shipping_base", this.tr_shipping_base) ++ + toKv("ti_currency", this.ti_currency) ++ + toKv("ti_price_base", this.ti_price_base) ++ + toKv("base_currency", this.base_currency) ++ + toKv("geo_timezone", this.geo_timezone) ++ + toKv("mkt_clickid", this.mkt_clickid) ++ + toKv("mkt_network", this.mkt_network) ++ + toKv("etl_tags", this.etl_tags) ++ + toDateKv("dvce_sent_tstamp", this.dvce_sent_tstamp) ++ + toKv("refr_domain_userid", this.refr_domain_userid) ++ + toDateKv("refr_dvce_tstamp", this.refr_dvce_tstamp) ++ + toKv("domain_sessionid", this.domain_sessionid) ++ + toDateKv("derived_tstamp", this.derived_tstamp) ++ + toKv("event_vendor", this.event_vendor) ++ + toKv("event_name", this.event_name) ++ + toKv("event_format", this.event_format) ++ + toKv("event_version", this.event_version) ++ + toKv("event_fingerprint", this.event_fingerprint) ++ + toDateKv("true_tstamp", this.true_tstamp) + ) + ) + } object EnrichedEvent { + def toPartiallyEnrichedEvent(enrichedEvent: EnrichedEvent): PartiallyEnrichedEvent = PartiallyEnrichedEvent( app_id = Option(enrichedEvent.app_id), diff --git a/modules/common/src/test/resources/iglu-schemas/schemas/com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0 b/modules/common/src/test/resources/iglu-schemas/schemas/com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0 new file mode 100644 index 000000000..0b7ff5790 --- /dev/null +++ b/modules/common/src/test/resources/iglu-schemas/schemas/com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0 @@ -0,0 +1,489 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "description": "Schema for an atomic canonical Snowplow event", + "self": { + "vendor": "com.snowplowanalytics.snowplow", + "name": "atomic", + "format": "jsonschema", + "version": "1-0-0" + }, + "type": "object", + "properties": { + "app_id": { + "type": ["string", "null"], + "maxLength": 255 + }, + "platform": { + "type": ["string", "null"], + "maxLength": 255 + }, + "etl_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "collector_tstamp": { + "type": "string", + "format": "date-time" + }, + "dvce_created_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "event": { + "type": ["string", "null"], + "maxLength": 128 + }, + "event_id": { + "type": "string", + "maxLength": 36 + }, + "txn_id": { + "type": ["integer", "null"] + }, + "name_tracker": { + "type": ["string", "null"], + "maxLength": 128 + }, + "v_tracker": { + "type": ["string", "null"], + "maxLength": 100 + }, + "v_collector": { + "type": "string", + "maxLength": 100 + }, + "v_etl": { + "type": "string", + "maxLength": 100 + }, + "user_id": { + "type": ["string", "null"], + "maxLength": 255 + }, + "user_ipaddress": { + "type": ["string", "null"], + "maxLength": 128 + }, + "user_fingerprint": { + "type": ["string", "null"], + "maxLength": 128 + }, + "domain_userid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "domain_sessionidx": { + "type": ["integer", "null"] + }, + "network_userid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "geo_country": { + "type": ["string", "null"], + "maxLength": 2 + }, + "geo_region": { + "type": ["string", "null"], + "maxLength": 3 + }, + "geo_city": { + "type": ["string", "null"], + "maxLength": 75 + }, + "geo_zipcode": { + "type": ["string", "null"], + "maxLength": 15 + }, + "geo_latitude": { + "type": ["number", "null"] + }, + "geo_longitude": { + "type": ["number", "null"] + }, + "geo_region_name": { + "type": ["string", "null"], + "maxLength": 100 + }, + "ip_isp": { + "type": ["string", "null"], + "maxLength": 100 + }, + "ip_organization": { + "type": ["string", "null"], + "maxLength": 128 + }, + "ip_domain": { + "type": ["string", "null"], + "maxLength": 128 + }, + "ip_netspeed": { + "type": ["string", "null"], + "maxLength": 100 + }, + "page_url": { + "type": ["string", "null"], + "maxLength": 4096 + }, + "page_title": { + "type": ["string", "null"], + "maxLength": 2000 + }, + "page_referrer": { + "type": ["string", "null"], + "maxLength": 4096 + }, + "page_urlscheme": { + "type": ["string", "null"], + "maxLength": 16 + }, + "page_urlhost": { + "type": ["string", "null"], + "maxLength": 255 + }, + "page_urlport": { + "type": ["integer", "null"] + }, + "page_urlpath": { + "type": ["string", "null"], + "maxLength": 3000 + }, + "page_urlquery": { + "type": ["string", "null"], + "maxLength": 6000 + }, + "page_urlfragment": { + "type": ["string", "null"], + "maxLength": 3000 + }, + "refr_urlscheme": { + "type": ["string", "null"], + "maxLength": 16 + }, + "refr_urlhost": { + "type": ["string", "null"], + "maxLength": 255 + }, + "refr_urlport": { + "type": ["integer", "null"] + }, + "refr_urlpath": { + "type": ["string", "null"], + "maxLength": 6000 + }, + "refr_urlquery": { + "type": ["string", "null"], + "maxLength": 6000 + }, + "refr_urlfragment": { + "type": ["string", "null"], + "maxLength": 3000 + }, + "refr_medium": { + "type": ["string", "null"], + "maxLength": 25 + }, + "refr_source": { + "type": ["string", "null"], + "maxLength": 50 + }, + "refr_term": { + "type": ["string", "null"], + "maxLength": 255 + }, + "mkt_medium": { + "type": ["string", "null"], + "maxLength": 255 + }, + "mkt_source": { + "type": ["string", "null"], + "maxLength": 255 + }, + "mkt_term": { + "type": ["string", "null"], + "maxLength": 255 + }, + "mkt_content": { + "type": ["string", "null"], + "maxLength": 500 + }, + "mkt_campaign": { + "type": ["string", "null"], + "maxLength": 255 + }, + "se_category": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "se_action": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "se_label": { + "type": ["string", "null"], + "maxLength": 4096 + }, + "se_property": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "se_value": { + "type": ["number", "null"] + }, + "tr_orderid": { + "type": ["string", "null"], + "maxLength": 255 + }, + "tr_affiliation": { + "type": ["string", "null"], + "maxLength": 255 + }, + "tr_total": { + "type": ["number", "null"] + }, + "tr_tax": { + "type": ["number", "null"] + }, + "tr_shipping": { + "type": ["number", "null"] + }, + "tr_city": { + "type": ["string", "null"], + "maxLength": 255 + }, + "tr_state": { + "type": ["string", "null"], + "maxLength": 255 + }, + "tr_country": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_orderid": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_sku": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_name": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_category": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_price": { + "type": ["number", "null"] + }, + "ti_quantity": { + "type": ["integer", "null"] + }, + "pp_xoffset_min": { + "type": ["integer", "null"] + }, + "pp_xoffset_max": { + "type": ["integer", "null"] + }, + "pp_yoffset_min": { + "type": ["integer", "null"] + }, + "pp_yoffset_max": { + "type": ["integer", "null"] + }, + "useragent": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "br_name": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_family": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_version": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_type": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_renderengine": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_lang": { + "type": ["string", "null"], + "maxLength": 255 + }, + "br_features_pdf": { + "type": ["boolean", "null"] + }, + "br_features_flash": { + "type": ["boolean", "null"] + }, + "br_features_java": { + "type": ["boolean", "null"] + }, + "br_features_director": { + "type": ["boolean", "null"] + }, + "br_features_quicktime": { + "type": ["boolean", "null"] + }, + "br_features_realplayer": { + "type": ["boolean", "null"] + }, + "br_features_windowsmedia": { + "type": ["boolean", "null"] + }, + "br_features_gears": { + "type": ["boolean", "null"] + }, + "br_features_silverlight": { + "type": ["boolean", "null"] + }, + "br_cookies": { + "type": ["boolean", "null"] + }, + "br_colordepth": { + "type": ["string", "null"], + "maxLength": 12 + }, + "br_viewwidth": { + "type": ["integer", "null"] + }, + "br_viewheight": { + "type": ["integer", "null"] + }, + "os_name": { + "type": ["string", "null"], + "maxLength": 50 + }, + "os_family": { + "type": ["string", "null"], + "maxLength": 50 + }, + "os_manufacturer": { + "type": ["string", "null"], + "maxLength": 50 + }, + "os_timezone": { + "type": ["string", "null"], + "maxLength": 255 + }, + "dvce_type": { + "type": ["string", "null"], + "maxLength": 50 + }, + "dvce_ismobile": { + "type": ["boolean", "null"] + }, + "dvce_screenwidth": { + "type": ["integer", "null"] + }, + "dvce_screenheight": { + "type": ["integer", "null"] + }, + "doc_charset": { + "type": ["string", "null"], + "maxLength": 128 + }, + "doc_width": { + "type": ["integer", "null"] + }, + "doc_height": { + "type": ["integer", "null"] + }, + "tr_currency": { + "type": ["string", "null"], + "maxLength": 3 + }, + "tr_total_base": { + "type": ["number", "null"] + }, + "tr_tax_base": { + "type": ["number", "null"] + }, + "tr_shipping_base": { + "type": ["number", "null"] + }, + "ti_currency": { + "type": ["string", "null"], + "maxLength": 3 + }, + "ti_price_base": { + "type": ["number", "null"] + }, + "base_currency": { + "type": ["string", "null"], + "maxLength": 3 + }, + "geo_timezone": { + "type": ["string", "null"], + "maxLength": 64 + }, + "mkt_clickid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "mkt_network": { + "type": ["string", "null"], + "maxLength": 64 + }, + "etl_tags": { + "type": ["string", "null"], + "maxLength": 500 + }, + "dvce_sent_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "refr_domain_userid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "refr_dvce_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "domain_sessionid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "derived_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "event_vendor": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "event_name": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "event_format": { + "type": ["string", "null"], + "maxLength": 128 + }, + "event_version": { + "type": ["string", "null"], + "maxLength": 128 + }, + "event_fingerprint": { + "type": ["string", "null"], + "maxLength": 128 + }, + "true_tstamp": { + "type": ["string", "null"], + "format": "date-time" + } + }, + "additionalProperties": false +} \ No newline at end of file diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala index 6cb2b608b..972618a70 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala @@ -20,6 +20,7 @@ import cats.data.NonEmptyList import io.circe.literal._ import org.joda.time.DateTime import com.snowplowanalytics.snowplow.badrows._ +import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailureMessage import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer} import loaders._ import adapters.RawEvent @@ -36,7 +37,6 @@ import enrichments.registry.{IabEnrichment, JavascriptScriptEnrichment, YauaaEnr import org.apache.commons.codec.digest.DigestUtils import org.specs2.mutable.Specification import org.specs2.matcher.EitherMatchers - import SpecHelpers._ class EnrichmentManagerSpec extends Specification with EitherMatchers { @@ -686,6 +686,40 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers { EnrichmentManager.getCollectorVersionSet(input) must beRight(()) } } + + "validateEnriched" should { + "create a bad row if a field is oversized (tv)" >> { + EnrichmentManager + .enrichEvent( + enrichmentReg, + client, + processor, + timestamp, + RawEvent(api, fatBody, None, source, context) + ) + .swap + .map { + case BadRow.EnrichmentFailures(_, failure, _) => + failure.messages.map(_.message match { + case EnrichmentFailureMessage.Simple(error) => error + case EnrichmentFailureMessage.IgluError(schemaKey, _) => schemaKey + case _ => None + }) + case _ => None + } + .getOrElse(None) === NonEmptyList( + s"Enriched event not valid against ${EnrichmentManager.atomicSchema.toSchemaUri}", + List(EnrichmentManager.atomicSchema) + ) + } + + "allow normal raw events" >> { + EnrichmentManager + .enrichEvent(enrichmentReg, client, processor, timestamp, RawEvent(api, leanBody, None, source, context)) + .map(_ => true) + .getOrElse(false) must beTrue + } + } } object EnrichmentManagerSpec { @@ -706,6 +740,18 @@ object EnrichmentManagerSpec { None ) + val leanBody = Map( + "e" -> "pp", + "tv" -> "js-0.13.1", + "p" -> "web" + ).toOpt + + val fatBody = Map( + "e" -> "pp", + "tv" -> s"${"s" * 500}", + "p" -> "web" + ).toOpt + val iabEnrichment = IabEnrichment .parse( json"""{ @@ -739,4 +785,5 @@ object EnrichmentManagerSpec { .getOrElse(throw new RuntimeException("IAB enrichment couldn't be initialised")) // to make sure it's not none .enrichment[Id] .some + }