From 1fe5371920dbb5af849c2cab88aa5343b7158aa0 Mon Sep 17 00:00:00 2001 From: "pavel.voropaev" Date: Fri, 3 Dec 2021 16:31:46 +0000 Subject: [PATCH] Enrichment to enforce maximum field lengths (close #517) --- .../EnrichBench.scala | 45 +- .../EtlPipelineBench.scala | 4 +- .../ThriftLoaderBench.scala | 4 +- .../enrichments/EnrichmentManager.scala | 3 + .../enrichments/EnrichmentRegistry.scala | 38 +- .../enrichments/registry/EnrichmentConf.scala | 4 + .../ShreddedValidatorEnrichment.scala | 112 ++++ .../common/outputs/EnrichedEvent.scala | 159 +++++- .../atomic/jsonschema/1-0-0 | 489 ++++++++++++++++++ .../ShreddedValidatorEnrichmentSpec.scala | 154 ++++++ 10 files changed, 983 insertions(+), 29 deletions(-) create mode 100644 modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/ShreddedValidatorEnrichment.scala create mode 100644 modules/common/src/test/resources/iglu-schemas/schemas/com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0 create mode 100644 modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/ShreddedValidatorEnrichmentSpec.scala diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala index dcd6d9d28..ffe6715ce 100644 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala @@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit import cats.effect.{ContextShift, IO, Clock, Blocker} +import io.circe.literal._ + import fs2.Stream import com.snowplowanalytics.iglu.client.Client @@ -43,10 +45,41 @@ class EnrichBench { implicit val ioClock: Clock[IO] = Clock.create[IO] + val client = Client.parseDefault[IO](json""" + { + "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1", + "data": { + "cacheSize": 500, + "repositories": [ + { + "name": "Iglu Central", + "priority": 0, + "vendorPrefixes": [ "com.snowplowanalytics" ], + "connection": { + "http": { + "uri": "http://iglucentral.com" + } + } + }, + { + "name": "Iglu Central - GCP Mirror", + "priority": 1, + "vendorPrefixes": [ "com.snowplowanalytics" ], + "connection": { + "http": { + "uri": "http://mirror01.iglucentral.com" + } + } + } + ] + } + } + """).rethrowT.unsafeRunSync() + @Benchmark def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = { implicit val CS: ContextShift[IO] = state.contextShift - Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), state.blocker, Client.IgluCentral, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync() + Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), client, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync() } @Benchmark @@ -83,19 +116,19 @@ object EnrichBench { raw = EnrichSpec.payload[IO] val input = Stream.emits(List( - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring ), - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring ), - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring ), - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring ), - EnrichSpec.colllectorPayload.copy( + EnrichSpec.collectorPayload.copy( querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring ), )).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO] diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala index b7c2d0b96..89a0e34b0 100644 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala @@ -49,13 +49,13 @@ class EtlPipelineBench { @Benchmark def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = { - val payload = EnrichSpec.colllectorPayload + val payload = EnrichSpec.collectorPayload EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync() } @Benchmark def measureProcessEventsId(state: EtlPipelineBench.BenchState) = { - val payload = EnrichSpec.colllectorPayload + val payload = EnrichSpec.collectorPayload EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))) } } diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala index b1b9e388f..0eea9f9f4 100644 --- a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala @@ -30,7 +30,7 @@ class ThriftLoaderBench { @Benchmark def measureNormalize(state: ThriftLoaderBench.BenchState) = { - Enrich.encodeEvent(state.event) + Enrich.serializeEnriched(state.event) } } @@ -42,7 +42,7 @@ object ThriftLoaderBench { @Setup(Level.Trial) def setup(): Unit = { - data = EnrichSpec.colllectorPayload.toRaw + data = EnrichSpec.collectorPayload.toRaw event = new EnrichedEvent() event.setApp_id("foo") diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index 482b031b3..29fc7caa0 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -92,6 +92,9 @@ object EnrichmentManager { enriched.pii = pii.asString } } + _ <- registry.shreddedValidator + .map(_.performCheck(enriched, raw, processor, client)) + .getOrElse(EitherT.rightT[F, BadRow](())) } yield enriched /** diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala index 3e9a06bcd..563eceec1 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala @@ -115,6 +115,7 @@ object EnrichmentRegistry { registry <- er } yield registry.copy(apiRequest = enrichment.some) case c: PiiPseudonymizerConf => er.map(_.copy(piiPseudonymizer = c.enrichment.some)) + case c: ShreddedValidatorEnrichmentConf => er.map(_.copy(shreddedValidator = c.enrichment.some)) case c: SqlQueryConf => for { enrichment <- EitherT.right(c.enrichment[F](blocker)) @@ -185,6 +186,8 @@ object EnrichmentRegistry { IpLookupsEnrichment.parse(enrichmentConfig, schemaKey, localMode).map(_.some) else if (nm == "anon_ip") AnonIpEnrichment.parse(enrichmentConfig, schemaKey).map(_.some) + else if (nm == "shredded_validator_config") + ShreddedValidatorEnrichment.parse(enrichmentConfig, schemaKey).map(_.some) else if (nm == "referer_parser") RefererParserEnrichment.parse(enrichmentConfig, schemaKey, localMode).map(_.some) else if (nm == "campaign_attribution") @@ -234,21 +237,22 @@ object EnrichmentRegistry { /** A registry to hold all of our enrichments. */ final case class EnrichmentRegistry[F[_]]( - apiRequest: Option[ApiRequestEnrichment[F]] = None, - piiPseudonymizer: Option[PiiPseudonymizerEnrichment] = None, - sqlQuery: Option[SqlQueryEnrichment[F]] = None, - anonIp: Option[AnonIpEnrichment] = None, - campaignAttribution: Option[CampaignAttributionEnrichment] = None, - cookieExtractor: Option[CookieExtractorEnrichment] = None, - currencyConversion: Option[CurrencyConversionEnrichment[F]] = None, - eventFingerprint: Option[EventFingerprintEnrichment] = None, - httpHeaderExtractor: Option[HttpHeaderExtractorEnrichment] = None, - iab: Option[IabEnrichment] = None, - ipLookups: Option[IpLookupsEnrichment[F]] = None, - javascriptScript: Option[JavascriptScriptEnrichment] = None, - refererParser: Option[RefererParserEnrichment] = None, - uaParser: Option[UaParserEnrichment] = None, - userAgentUtils: Option[UserAgentUtilsEnrichment] = None, - weather: Option[WeatherEnrichment[F]] = None, - yauaa: Option[YauaaEnrichment] = None + apiRequest: Option[ApiRequestEnrichment[F]] = None, + piiPseudonymizer: Option[PiiPseudonymizerEnrichment] = None, + shreddedValidator: Option[ShreddedValidatorEnrichment] = Some(ShreddedValidatorEnrichment()), + sqlQuery: Option[SqlQueryEnrichment[F]] = None, + anonIp: Option[AnonIpEnrichment] = None, + campaignAttribution: Option[CampaignAttributionEnrichment] = None, + cookieExtractor: Option[CookieExtractorEnrichment] = None, + currencyConversion: Option[CurrencyConversionEnrichment[F]] = None, + eventFingerprint: Option[EventFingerprintEnrichment] = None, + httpHeaderExtractor: Option[HttpHeaderExtractorEnrichment] = None, + iab: Option[IabEnrichment] = None, + ipLookups: Option[IpLookupsEnrichment[F]] = None, + javascriptScript: Option[JavascriptScriptEnrichment] = None, + refererParser: Option[RefererParserEnrichment] = None, + uaParser: Option[UaParserEnrichment] = None, + userAgentUtils: Option[UserAgentUtilsEnrichment] = None, + weather: Option[WeatherEnrichment[F]] = None, + yauaa: Option[YauaaEnrichment] = None ) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala index 20906bd89..b3d9539c3 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala @@ -191,6 +191,10 @@ object EnrichmentConf { def enrichment: UserAgentUtilsEnrichment = UserAgentUtilsEnrichment(schemaKey) } + final case class ShreddedValidatorEnrichmentConf(schemaKey: SchemaKey, disable: Boolean) extends EnrichmentConf { + def enrichment: ShreddedValidatorEnrichment = ShreddedValidatorEnrichment(disable) + } + final case class WeatherConf( schemaKey: SchemaKey, apiHost: String, diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/ShreddedValidatorEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/ShreddedValidatorEnrichment.scala new file mode 100644 index 000000000..7ac64f65d --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/ShreddedValidatorEnrichment.scala @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2012-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry + +import cats.Monad +import cats.data.{EitherT, NonEmptyList, ValidatedNel} +import cats.effect.Clock +import cats.implicits._ +import io.circe.Json +import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup +import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.snowplow.badrows.FailureDetails.{EnrichmentFailureMessage, EnrichmentInformation} +import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor} +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent.toRawEvent +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentManager +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ShreddedValidatorEnrichmentConf +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent.toPartiallyEnrichedEvent +import com.snowplowanalytics.snowplow.enrich.common.utils.IgluUtils.buildSchemaViolationsBadRow + +object ShreddedValidatorEnrichment extends ParseableEnrichment { + override val supportedSchema: SchemaCriterion = + SchemaCriterion( + "com.snowplowanalytics.snowplow.enrichments", + "shredded_validator_config", + "jsonschema", + 1, + 0, + 0 + ) + + /** + * Creates an AnonIpEnrichment instance from a Json. + * @param config The anon_ip enrichment JSON + * @param schemaKey provided for the enrichment, must be supported by this enrichment + * @return an AnonIpEnrichment configuration + */ + override def parse( + config: Json, + schemaKey: SchemaKey, + localMode: Boolean = false + ): ValidatedNel[String, ShreddedValidatorEnrichmentConf] = + isParseable(config, schemaKey) + .flatMap(_ => + config.hcursor + .getOrElse[Boolean]("disable")(false) + .leftMap(_.getMessage()) + ) + .map(disable => ShreddedValidatorEnrichmentConf(schemaKey, disable)) + .toValidatedNel +} + +case class ShreddedValidatorEnrichment(disable: Boolean = false) extends Enrichment { + def schemaKey: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0)) + + def performCheck[F[_]]( + e: EnrichedEvent, + re: RawEvent, + processor: Processor, + client: Client[F, Json] + )( + implicit + M: Monad[F], + L: RegistryLookup[F], + C: Clock[F] + ): EitherT[F, BadRow, Unit] = + if (!disable) + e.toShreddedEvent + .leftMap(err => + EnrichmentManager.buildEnrichmentFailuresBadRow( + NonEmptyList.one( + FailureDetails.EnrichmentFailure(Some(EnrichmentInformation(schemaKey, identifier = "shredded_validator")), + EnrichmentFailureMessage.Simple(err.getMessage) + ) + ), + toPartiallyEnrichedEvent(e), + toRawEvent(re), + processor + ) + ) + .toEitherT[F] + .flatMap(shreded => + client + .check( + SelfDescribingData(schemaKey, shreded) + ) + .leftMap(err => + buildSchemaViolationsBadRow( + NonEmptyList.one( + FailureDetails.SchemaViolation.IgluError(schemaKey, err) + ), + toPartiallyEnrichedEvent(e), + toRawEvent(re), + processor + ) + ) + ) + else + EitherT.rightT(()) +} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala index 543a837ba..a58491768 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/outputs/EnrichedEvent.scala @@ -15,10 +15,11 @@ package com.snowplowanalytics.snowplow.enrich.common.outputs import java.lang.{Integer => JInteger} import java.lang.{Float => JFloat} import java.lang.{Byte => JByte} - +import cats.implicits._ import scala.beans.BeanProperty - import com.snowplowanalytics.snowplow.badrows.Payload.PartiallyEnrichedEvent +import io.circe.Json +import java.time.format.DateTimeFormatter /** * The canonical output format for enriched events. @@ -246,9 +247,163 @@ class EnrichedEvent extends Serializable { // Fields modified in PII enrichemnt (JSON String) @BeanProperty var pii: String = _ + + private val JsonSchemaDateTimeFormat = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + + def toKv(k: String, v: String): Seq[(String, Json)] = if (v != null) Seq((k, Json.fromString(v))) else Seq.empty[(String, Json)] + def toKv(k: String, v: JByte): Seq[(String, Json)] = if (v != null) Seq((k, Json.fromBoolean(v == 1))) else Seq.empty[(String, Json)] + def toKv(k: String, v: JInteger): Seq[(String, Json)] = if (v != null) Seq((k, Json.fromInt(v))) else Seq.empty[(String, Json)] + def toKv(k: String, v: JFloat): Seq[(String, Json)] = if (v != null) Seq((k, Json.fromFloatOrNull(v))) else Seq.empty[(String, Json)] + def toDateKv(k: String, v: String): Seq[(String, Json)] = + if (v != null) + Seq( + (k, + Json.fromString( + DateTimeFormatter.ISO_DATE_TIME.format(JsonSchemaDateTimeFormat.parse(v)) + ) + ) + ) + else Seq.empty[(String, Json)] + + def toShreddedEvent: Either[Throwable, Json] = + Either.catchNonFatal( + Json.fromFields( + toKv("app_id", this.app_id) ++ + toKv("platform", this.platform) ++ + toDateKv("etl_tstamp", this.etl_tstamp) ++ + toDateKv("collector_tstamp", this.collector_tstamp) ++ + toDateKv("dvce_created_tstamp", this.dvce_created_tstamp) ++ + toKv("event", this.event) ++ + toKv("event_id", this.event_id) ++ + toKv("txn_id", this.txn_id) ++ + toKv("name_tracker", this.name_tracker) ++ + toKv("v_tracker", this.v_tracker) ++ + toKv("v_collector", this.v_collector) ++ + toKv("v_etl", this.v_etl) ++ + toKv("user_id", this.user_id) ++ + toKv("user_ipaddress", this.user_ipaddress) ++ + toKv("user_fingerprint", this.user_fingerprint) ++ + toKv("domain_userid", this.domain_userid) ++ + toKv("domain_sessionidx", this.domain_sessionidx) ++ + toKv("network_userid", this.network_userid) ++ + toKv("geo_country", this.geo_country) ++ + toKv("geo_region", this.geo_region) ++ + toKv("geo_city", this.geo_city) ++ + toKv("geo_zipcode", this.geo_zipcode) ++ + toKv("geo_latitude", this.geo_latitude) ++ + toKv("geo_longitude", this.geo_longitude) ++ + toKv("geo_region_name", this.geo_region_name) ++ + toKv("ip_isp", this.ip_isp) ++ + toKv("ip_organization", this.ip_organization) ++ + toKv("ip_domain", this.ip_domain) ++ + toKv("ip_netspeed", this.ip_netspeed) ++ + toKv("page_url", this.page_url) ++ + toKv("page_title", this.page_title) ++ + toKv("page_referrer", this.page_referrer) ++ + toKv("page_urlscheme", this.page_urlscheme) ++ + toKv("page_urlhost", this.page_urlhost) ++ + toKv("page_urlport", this.page_urlport) ++ + toKv("page_urlpath", this.page_urlpath) ++ + toKv("page_urlquery", this.page_urlquery) ++ + toKv("page_urlfragment", this.page_urlfragment) ++ + toKv("refr_urlscheme", this.refr_urlscheme) ++ + toKv("refr_urlhost", this.refr_urlhost) ++ + toKv("refr_urlport", this.refr_urlport) ++ + toKv("refr_urlpath", this.refr_urlpath) ++ + toKv("refr_urlquery", this.refr_urlquery) ++ + toKv("refr_urlfragment", this.refr_urlfragment) ++ + toKv("refr_medium", this.refr_medium) ++ + toKv("refr_source", this.refr_source) ++ + toKv("refr_term", this.refr_term) ++ + toKv("mkt_medium", this.mkt_medium) ++ + toKv("mkt_source", this.mkt_source) ++ + toKv("mkt_term", this.mkt_term) ++ + toKv("mkt_content", this.mkt_content) ++ + toKv("mkt_campaign", this.mkt_campaign) ++ + toKv("se_category", this.se_category) ++ + toKv("se_action", this.se_action) ++ + toKv("se_label", this.se_label) ++ + toKv("se_property", this.se_property) ++ + toKv("se_value", this.se_value) ++ + toKv("tr_orderid", this.tr_orderid) ++ + toKv("tr_affiliation", this.tr_affiliation) ++ + toKv("tr_total", this.tr_total) ++ + toKv("tr_tax", this.tr_tax) ++ + toKv("tr_shipping", this.tr_shipping) ++ + toKv("tr_city", this.tr_city) ++ + toKv("tr_state", this.tr_state) ++ + toKv("tr_country", this.tr_country) ++ + toKv("ti_orderid", this.ti_orderid) ++ + toKv("ti_sku", this.ti_sku) ++ + toKv("ti_name", this.ti_name) ++ + toKv("ti_category", this.ti_category) ++ + toKv("ti_price", this.ti_price) ++ + toKv("ti_quantity", this.ti_quantity) ++ + toKv("pp_xoffset_min", this.pp_xoffset_min) ++ + toKv("pp_xoffset_max", this.pp_xoffset_max) ++ + toKv("pp_yoffset_min", this.pp_yoffset_min) ++ + toKv("pp_yoffset_max", this.pp_yoffset_max) ++ + toKv("useragent", this.useragent) ++ + toKv("br_name", this.br_name) ++ + toKv("br_family", this.br_family) ++ + toKv("br_version", this.br_version) ++ + toKv("br_type", this.br_type) ++ + toKv("br_renderengine", this.br_renderengine) ++ + toKv("br_lang", this.br_lang) ++ + toKv("br_features_pdf", this.br_features_pdf) ++ + toKv("br_features_flash", this.br_features_flash) ++ + toKv("br_features_java", this.br_features_java) ++ + toKv("br_features_director", this.br_features_director) ++ + toKv("br_features_quicktime", this.br_features_quicktime) ++ + toKv("br_features_realplayer", this.br_features_realplayer) ++ + toKv("br_features_windowsmedia", this.br_features_windowsmedia) ++ + toKv("br_features_gears", this.br_features_gears) ++ + toKv("br_features_silverlight", this.br_features_silverlight) ++ + toKv("br_cookies", this.br_cookies) ++ + toKv("br_colordepth", this.br_colordepth) ++ + toKv("br_viewwidth", this.br_viewwidth) ++ + toKv("br_viewheight", this.br_viewheight) ++ + toKv("os_name", this.os_name) ++ + toKv("os_family", this.os_family) ++ + toKv("os_manufacturer", this.os_manufacturer) ++ + toKv("os_timezone", this.os_timezone) ++ + toKv("dvce_type", this.dvce_type) ++ + toKv("dvce_ismobile", this.dvce_ismobile) ++ + toKv("dvce_screenwidth", this.dvce_screenwidth) ++ + toKv("dvce_screenheight", this.dvce_screenheight) ++ + toKv("doc_charset", this.doc_charset) ++ + toKv("doc_width", this.doc_width) ++ + toKv("doc_height", this.doc_height) ++ + toKv("tr_currency", this.tr_currency) ++ + toKv("tr_total_base", this.tr_total_base) ++ + toKv("tr_tax_base", this.tr_tax_base) ++ + toKv("tr_shipping_base", this.tr_shipping_base) ++ + toKv("ti_currency", this.ti_currency) ++ + toKv("ti_price_base", this.ti_price_base) ++ + toKv("base_currency", this.base_currency) ++ + toKv("geo_timezone", this.geo_timezone) ++ + toKv("mkt_clickid", this.mkt_clickid) ++ + toKv("mkt_network", this.mkt_network) ++ + toKv("etl_tags", this.etl_tags) ++ + toDateKv("dvce_sent_tstamp", this.dvce_sent_tstamp) ++ + toKv("refr_domain_userid", this.refr_domain_userid) ++ + toDateKv("refr_dvce_tstamp", this.refr_dvce_tstamp) ++ + toKv("domain_sessionid", this.domain_sessionid) ++ + toDateKv("derived_tstamp", this.derived_tstamp) ++ + toKv("event_vendor", this.event_vendor) ++ + toKv("event_name", this.event_name) ++ + toKv("event_format", this.event_format) ++ + toKv("event_version", this.event_version) ++ + toKv("event_fingerprint", this.event_fingerprint) ++ + toDateKv("true_tstamp", this.true_tstamp) + ) + ) + } object EnrichedEvent { + def toPartiallyEnrichedEvent(enrichedEvent: EnrichedEvent): PartiallyEnrichedEvent = PartiallyEnrichedEvent( app_id = Option(enrichedEvent.app_id), diff --git a/modules/common/src/test/resources/iglu-schemas/schemas/com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0 b/modules/common/src/test/resources/iglu-schemas/schemas/com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0 new file mode 100644 index 000000000..0b7ff5790 --- /dev/null +++ b/modules/common/src/test/resources/iglu-schemas/schemas/com.snowplowanalytics.snowplow/atomic/jsonschema/1-0-0 @@ -0,0 +1,489 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "description": "Schema for an atomic canonical Snowplow event", + "self": { + "vendor": "com.snowplowanalytics.snowplow", + "name": "atomic", + "format": "jsonschema", + "version": "1-0-0" + }, + "type": "object", + "properties": { + "app_id": { + "type": ["string", "null"], + "maxLength": 255 + }, + "platform": { + "type": ["string", "null"], + "maxLength": 255 + }, + "etl_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "collector_tstamp": { + "type": "string", + "format": "date-time" + }, + "dvce_created_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "event": { + "type": ["string", "null"], + "maxLength": 128 + }, + "event_id": { + "type": "string", + "maxLength": 36 + }, + "txn_id": { + "type": ["integer", "null"] + }, + "name_tracker": { + "type": ["string", "null"], + "maxLength": 128 + }, + "v_tracker": { + "type": ["string", "null"], + "maxLength": 100 + }, + "v_collector": { + "type": "string", + "maxLength": 100 + }, + "v_etl": { + "type": "string", + "maxLength": 100 + }, + "user_id": { + "type": ["string", "null"], + "maxLength": 255 + }, + "user_ipaddress": { + "type": ["string", "null"], + "maxLength": 128 + }, + "user_fingerprint": { + "type": ["string", "null"], + "maxLength": 128 + }, + "domain_userid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "domain_sessionidx": { + "type": ["integer", "null"] + }, + "network_userid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "geo_country": { + "type": ["string", "null"], + "maxLength": 2 + }, + "geo_region": { + "type": ["string", "null"], + "maxLength": 3 + }, + "geo_city": { + "type": ["string", "null"], + "maxLength": 75 + }, + "geo_zipcode": { + "type": ["string", "null"], + "maxLength": 15 + }, + "geo_latitude": { + "type": ["number", "null"] + }, + "geo_longitude": { + "type": ["number", "null"] + }, + "geo_region_name": { + "type": ["string", "null"], + "maxLength": 100 + }, + "ip_isp": { + "type": ["string", "null"], + "maxLength": 100 + }, + "ip_organization": { + "type": ["string", "null"], + "maxLength": 128 + }, + "ip_domain": { + "type": ["string", "null"], + "maxLength": 128 + }, + "ip_netspeed": { + "type": ["string", "null"], + "maxLength": 100 + }, + "page_url": { + "type": ["string", "null"], + "maxLength": 4096 + }, + "page_title": { + "type": ["string", "null"], + "maxLength": 2000 + }, + "page_referrer": { + "type": ["string", "null"], + "maxLength": 4096 + }, + "page_urlscheme": { + "type": ["string", "null"], + "maxLength": 16 + }, + "page_urlhost": { + "type": ["string", "null"], + "maxLength": 255 + }, + "page_urlport": { + "type": ["integer", "null"] + }, + "page_urlpath": { + "type": ["string", "null"], + "maxLength": 3000 + }, + "page_urlquery": { + "type": ["string", "null"], + "maxLength": 6000 + }, + "page_urlfragment": { + "type": ["string", "null"], + "maxLength": 3000 + }, + "refr_urlscheme": { + "type": ["string", "null"], + "maxLength": 16 + }, + "refr_urlhost": { + "type": ["string", "null"], + "maxLength": 255 + }, + "refr_urlport": { + "type": ["integer", "null"] + }, + "refr_urlpath": { + "type": ["string", "null"], + "maxLength": 6000 + }, + "refr_urlquery": { + "type": ["string", "null"], + "maxLength": 6000 + }, + "refr_urlfragment": { + "type": ["string", "null"], + "maxLength": 3000 + }, + "refr_medium": { + "type": ["string", "null"], + "maxLength": 25 + }, + "refr_source": { + "type": ["string", "null"], + "maxLength": 50 + }, + "refr_term": { + "type": ["string", "null"], + "maxLength": 255 + }, + "mkt_medium": { + "type": ["string", "null"], + "maxLength": 255 + }, + "mkt_source": { + "type": ["string", "null"], + "maxLength": 255 + }, + "mkt_term": { + "type": ["string", "null"], + "maxLength": 255 + }, + "mkt_content": { + "type": ["string", "null"], + "maxLength": 500 + }, + "mkt_campaign": { + "type": ["string", "null"], + "maxLength": 255 + }, + "se_category": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "se_action": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "se_label": { + "type": ["string", "null"], + "maxLength": 4096 + }, + "se_property": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "se_value": { + "type": ["number", "null"] + }, + "tr_orderid": { + "type": ["string", "null"], + "maxLength": 255 + }, + "tr_affiliation": { + "type": ["string", "null"], + "maxLength": 255 + }, + "tr_total": { + "type": ["number", "null"] + }, + "tr_tax": { + "type": ["number", "null"] + }, + "tr_shipping": { + "type": ["number", "null"] + }, + "tr_city": { + "type": ["string", "null"], + "maxLength": 255 + }, + "tr_state": { + "type": ["string", "null"], + "maxLength": 255 + }, + "tr_country": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_orderid": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_sku": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_name": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_category": { + "type": ["string", "null"], + "maxLength": 255 + }, + "ti_price": { + "type": ["number", "null"] + }, + "ti_quantity": { + "type": ["integer", "null"] + }, + "pp_xoffset_min": { + "type": ["integer", "null"] + }, + "pp_xoffset_max": { + "type": ["integer", "null"] + }, + "pp_yoffset_min": { + "type": ["integer", "null"] + }, + "pp_yoffset_max": { + "type": ["integer", "null"] + }, + "useragent": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "br_name": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_family": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_version": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_type": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_renderengine": { + "type": ["string", "null"], + "maxLength": 50 + }, + "br_lang": { + "type": ["string", "null"], + "maxLength": 255 + }, + "br_features_pdf": { + "type": ["boolean", "null"] + }, + "br_features_flash": { + "type": ["boolean", "null"] + }, + "br_features_java": { + "type": ["boolean", "null"] + }, + "br_features_director": { + "type": ["boolean", "null"] + }, + "br_features_quicktime": { + "type": ["boolean", "null"] + }, + "br_features_realplayer": { + "type": ["boolean", "null"] + }, + "br_features_windowsmedia": { + "type": ["boolean", "null"] + }, + "br_features_gears": { + "type": ["boolean", "null"] + }, + "br_features_silverlight": { + "type": ["boolean", "null"] + }, + "br_cookies": { + "type": ["boolean", "null"] + }, + "br_colordepth": { + "type": ["string", "null"], + "maxLength": 12 + }, + "br_viewwidth": { + "type": ["integer", "null"] + }, + "br_viewheight": { + "type": ["integer", "null"] + }, + "os_name": { + "type": ["string", "null"], + "maxLength": 50 + }, + "os_family": { + "type": ["string", "null"], + "maxLength": 50 + }, + "os_manufacturer": { + "type": ["string", "null"], + "maxLength": 50 + }, + "os_timezone": { + "type": ["string", "null"], + "maxLength": 255 + }, + "dvce_type": { + "type": ["string", "null"], + "maxLength": 50 + }, + "dvce_ismobile": { + "type": ["boolean", "null"] + }, + "dvce_screenwidth": { + "type": ["integer", "null"] + }, + "dvce_screenheight": { + "type": ["integer", "null"] + }, + "doc_charset": { + "type": ["string", "null"], + "maxLength": 128 + }, + "doc_width": { + "type": ["integer", "null"] + }, + "doc_height": { + "type": ["integer", "null"] + }, + "tr_currency": { + "type": ["string", "null"], + "maxLength": 3 + }, + "tr_total_base": { + "type": ["number", "null"] + }, + "tr_tax_base": { + "type": ["number", "null"] + }, + "tr_shipping_base": { + "type": ["number", "null"] + }, + "ti_currency": { + "type": ["string", "null"], + "maxLength": 3 + }, + "ti_price_base": { + "type": ["number", "null"] + }, + "base_currency": { + "type": ["string", "null"], + "maxLength": 3 + }, + "geo_timezone": { + "type": ["string", "null"], + "maxLength": 64 + }, + "mkt_clickid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "mkt_network": { + "type": ["string", "null"], + "maxLength": 64 + }, + "etl_tags": { + "type": ["string", "null"], + "maxLength": 500 + }, + "dvce_sent_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "refr_domain_userid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "refr_dvce_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "domain_sessionid": { + "type": ["string", "null"], + "maxLength": 128 + }, + "derived_tstamp": { + "type": ["string", "null"], + "format": "date-time" + }, + "event_vendor": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "event_name": { + "type": ["string", "null"], + "maxLength": 1000 + }, + "event_format": { + "type": ["string", "null"], + "maxLength": 128 + }, + "event_version": { + "type": ["string", "null"], + "maxLength": 128 + }, + "event_fingerprint": { + "type": ["string", "null"], + "maxLength": 128 + }, + "true_tstamp": { + "type": ["string", "null"], + "format": "date-time" + } + }, + "additionalProperties": false +} \ No newline at end of file diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/ShreddedValidatorEnrichmentSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/ShreddedValidatorEnrichmentSpec.scala new file mode 100644 index 000000000..117aab47a --- /dev/null +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/ShreddedValidatorEnrichmentSpec.scala @@ -0,0 +1,154 @@ +package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry + +import cats.implicits._ +import cats.effect.{Blocker, Clock, IO} +import org.specs2.matcher.MustMatchers.{left => _, right => _} +import org.specs2.mutable.Specification + +import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.resolver.registries.Registry +import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} +import com.snowplowanalytics.snowplow.badrows.Processor +import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline +import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload + +import io.circe.literal._ +import org.joda.time.DateTime + +import scala.concurrent.ExecutionContext + +class ShreddedValidatorEnrichmentSpec extends Specification { + implicit val ioClock: Clock[IO] = Clock.create[IO] + val adapterRegistry = new AdapterRegistry() + val enrichmentReg = EnrichmentRegistry[IO]() + val disabledEnrichReg = EnrichmentRegistry[IO](shreddedValidator = Some(ShreddedValidatorEnrichment(disable = true))) + val igluCentral = Registry.IgluCentral + val client = Client.parseDefault[IO](json""" + { + "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1", + "data": { + "cacheSize": 500, + "repositories": [ + { + "name": "Iglu Central", + "priority": 0, + "vendorPrefixes": [ "com.snowplowanalytics" ], + "connection": { + "http": { + "uri": "http://iglucentral.com" + } + } + }, + { + "name": "Iglu Central - GCP Mirror", + "priority": 1, + "vendorPrefixes": [ "com.snowplowanalytics" ], + "connection": { + "http": { + "uri": "http://mirror01.iglucentral.com" + } + } + } + ] + } + } + """) + val processor = Processor("sce-test-suite", "1.0.0") + val dateTime = DateTime.now() + val blocker: Blocker = Blocker.liftExecutionContext(ExecutionContext.global) + + def processEvents(e: CollectorPayload): IO[Boolean] = + EtlPipeline + .processEvents[IO]( + adapterRegistry, + enrichmentReg, + client.rethrowT.unsafeRunSync(), + processor, + dateTime, + Some(e).validNel + ) + .map(_.forall(p => p.isValid)) + + def processEventsDisabled(e: CollectorPayload): IO[Boolean] = + EtlPipeline + .processEvents[IO]( + adapterRegistry, + disabledEnrichReg, + client.rethrowT.unsafeRunSync(), + processor, + dateTime, + Some(e).validNel + ) + .map(_.forall(p => p.isValid)) + + val body = SpecHelpers.toSelfDescJson( + """[{"tv":"ios-0.1.0","p":"mob","e":"se"}]""", + "payload_data" + ) + + val longStr = "s" * 500 + val fatBody = SpecHelpers.toSelfDescJson( + s"""[{"tv":"$longStr","p":"mob","e":"se" , "tr_ci": "$longStr"}]""", + "payload_data" + ) + + val ApplicationJsonWithCapitalCharset = "application/json; charset=UTF-8" + object Snowplow { + private val api: (String) => CollectorPayload.Api = version => CollectorPayload.Api("com.snowplowanalytics.snowplow", version) + val Tp2 = api("tp2") + } + + object Shared { + val api = CollectorPayload.Api("com.statusgator", "v1") + val source = CollectorPayload.Source("clj-tomcat", "UTF-8", None) + val context = CollectorPayload.Context( + DateTime.parse("2013-08-29T00:18:48.000+00:00").some, + "37.157.33.123".some, + None, + None, + Nil, + None + ) + } + + val payload = CollectorPayload( + Snowplow.Tp2, + SpecHelpers.toNameValuePairs("tv" -> "0", "nuid" -> "123"), + ApplicationJsonWithCapitalCharset.some, + body.some, + Shared.source, + Shared.context + ) + + val fatPayload = CollectorPayload( + Snowplow.Tp2, + SpecHelpers.toNameValuePairs("tv" -> "0", "nuid" -> "123"), + ApplicationJsonWithCapitalCharset.some, + fatBody.some, + Shared.source, + Shared.context + ) +// val oversized = CollectorPayload(api, Nil, None, None, source, context) + + "config should parse" >> { + val configJson = json"""{ + "disable": true + }""" + val schemaKey = SchemaKey( + "com.snowplowanalytics.snowplow.enrichments", + "shredded_validator_config", + "jsonschema", + SchemaVer.Full(1, 0, 0) + ) + val config = ShreddedValidatorEnrichment.parse(configJson, schemaKey) + config.map(_.disable).getOrElse(false) must beTrue + + } + + "disabled enrichment should let oversized data though" >> { processEventsDisabled(fatPayload).unsafeRunSync() must beTrue } + "invalid payload should create BadRow" >> { processEvents(fatPayload).unsafeRunSync() must beFalse } + "valid payload should not throw BadRaw" >> { processEvents(payload).unsafeRunSync() must beTrue } +}