diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala new file mode 100644 index 000000000..7bd66795d --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala @@ -0,0 +1,168 @@ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import cats.Monad +import cats.data.Validated.{Invalid, Valid} +import cats.data.{NonEmptyList, ValidatedNel} +import cats.implicits._ +import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure +import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor} +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent +import org.slf4j.LoggerFactory + +/** + * Atomic fields length validation inspired by + * https://github.com/snowplow/snowplow-scala-analytics-sdk/blob/master/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/validate/package.scala + * */ +object AtomicFieldsLengthValidator { + + final case class AtomicField(name: String, + enrichedValueExtractor: EnrichedEvent => String, + maxLength: Int) + + private val logger = LoggerFactory.getLogger("AtomicLengthValidator") + + private val atomicFields = + List( + AtomicField(name = "app_id", _.app_id, maxLength = 255 ), + AtomicField(name = "platform", _.platform, maxLength = 255 ), + AtomicField(name = "event", _.event, maxLength = 128 ), + AtomicField(name = "event_id", _.event, maxLength = 36 ), + AtomicField(name = "name_tracker", _.name_tracker, maxLength = 128 ), + AtomicField(name = "v_tracker", _.v_tracker, maxLength = 100 ), + AtomicField(name = "v_collector", _.v_collector, maxLength = 100 ), + AtomicField(name = "v_etl", _.v_etl, maxLength = 100 ), + AtomicField(name = "user_id", _.user_id, maxLength = 255 ), + AtomicField(name = "user_ipaddress", _.user_ipaddress, maxLength = 128 ), + AtomicField(name = "user_fingerprint", _.user_fingerprint, maxLength = 128 ), + AtomicField(name = "domain_userid", _.domain_userid, maxLength = 128 ), + AtomicField(name = "network_userid", _.network_userid, maxLength = 128 ), + AtomicField(name = "geo_country", _.geo_country, maxLength = 2 ), + AtomicField(name = "geo_region", _.geo_region, maxLength = 3 ), + AtomicField(name = "geo_city", _.geo_city, maxLength = 75 ), + AtomicField(name = "geo_zipcode", _.geo_zipcode, maxLength = 15 ), + AtomicField(name = "geo_region_name", _.geo_region_name, maxLength = 100 ), + AtomicField(name = "ip_isp", _.ip_isp, maxLength = 100 ), + AtomicField(name = "ip_organization", _.ip_organization, maxLength = 128 ), + AtomicField(name = "ip_domain", _.ip_domain, maxLength = 128 ), + AtomicField(name = "ip_netspeed", _.ip_netspeed, maxLength = 100 ), + AtomicField(name = "page_url", _.page_url, maxLength = 4096), + AtomicField(name = "page_title", _.page_title, maxLength = 2000), + AtomicField(name = "page_referrer", _.page_referrer, maxLength = 4096), + AtomicField(name = "page_urlscheme", _.page_urlscheme, maxLength = 16 ), + AtomicField(name = "page_urlhost", _.page_urlhost, maxLength = 255 ), + AtomicField(name = "page_urlpath", _.page_urlpath, maxLength = 3000), + AtomicField(name = "page_urlquery", _.page_urlquery, maxLength = 6000), + AtomicField(name = "page_urlfragment", _.page_urlfragment, maxLength = 3000), + AtomicField(name = "refr_urlscheme", _.refr_urlscheme, maxLength = 16 ), + AtomicField(name = "refr_urlhost", _.refr_urlhost, maxLength = 255 ), + AtomicField(name = "refr_urlpath", _.refr_urlpath, maxLength = 6000), + AtomicField(name = "refr_urlquery", _.refr_urlquery, maxLength = 6000), + AtomicField(name = "refr_urlfragment", _.refr_urlfragment, maxLength = 3000), + AtomicField(name = "refr_medium", _.refr_medium, maxLength = 25 ), + AtomicField(name = "refr_source", _.refr_source, maxLength = 50 ), + AtomicField(name = "refr_term", _.refr_term, maxLength = 255 ), + AtomicField(name = "mkt_medium", _.mkt_medium, maxLength = 255 ), + AtomicField(name = "mkt_source", _.mkt_source, maxLength = 255 ), + AtomicField(name = "mkt_term", _.mkt_term, maxLength = 255 ), + AtomicField(name = "mkt_content", _.mkt_content, maxLength = 500 ), + AtomicField(name = "mkt_campaign", _.mkt_campaign, maxLength = 255 ), + AtomicField(name = "se_category", _.se_category, maxLength = 1000), + AtomicField(name = "se_action", _.se_action, maxLength = 1000), + AtomicField(name = "se_label", _.se_label, maxLength = 4096), + AtomicField(name = "se_property", _.se_property, maxLength = 1000), + AtomicField(name = "tr_orderid", _.tr_orderid, maxLength = 255 ), + AtomicField(name = "tr_affiliation", _.tr_affiliation, maxLength = 255 ), + AtomicField(name = "tr_city", _.tr_city, maxLength = 255 ), + AtomicField(name = "tr_state", _.tr_state, maxLength = 255 ), + AtomicField(name = "tr_country", _.tr_country, maxLength = 255 ), + AtomicField(name = "ti_orderid", _.ti_orderid, maxLength = 255 ), + AtomicField(name = "ti_sku", _.ti_sku, maxLength = 255 ), + AtomicField(name = "ti_name", _.ti_name, maxLength = 255 ), + AtomicField(name = "ti_category", _.ti_category, maxLength = 255 ), + AtomicField(name = "useragent", _.useragent, maxLength = 1000), + AtomicField(name = "br_name", _.br_name, maxLength = 50 ), + AtomicField(name = "br_family", _.br_family, maxLength = 50 ), + AtomicField(name = "br_version", _.br_version, maxLength = 50 ), + AtomicField(name = "br_type", _.br_type, maxLength = 50 ), + AtomicField(name = "br_renderengine", _.br_renderengine, maxLength = 50 ), + AtomicField(name = "br_lang", _.br_lang, maxLength = 255 ), + AtomicField(name = "br_colordepth", _.br_colordepth, maxLength = 12 ), + AtomicField(name = "os_name", _.os_name, maxLength = 50 ), + AtomicField(name = "os_family", _.os_family, maxLength = 50 ), + AtomicField(name = "os_manufacturer", _.os_manufacturer, maxLength = 50 ), + AtomicField(name = "os_timezone", _.os_timezone, maxLength = 255 ), + AtomicField(name = "dvce_type", _.dvce_type, maxLength = 50 ), + AtomicField(name = "doc_charset", _.doc_charset, maxLength = 128 ), + AtomicField(name = "tr_currency", _.tr_currency, maxLength = 3 ), + AtomicField(name = "ti_currency", _.ti_currency, maxLength = 3 ), + AtomicField(name = "base_currency", _.base_currency, maxLength = 3 ), + AtomicField(name = "geo_timezone", _.geo_timezone, maxLength = 64 ), + AtomicField(name = "mkt_clickid", _.mkt_clickid, maxLength = 128 ), + AtomicField(name = "mkt_network", _.mkt_network, maxLength = 64 ), + AtomicField(name = "etl_tags", _.etl_tags, maxLength = 500 ), + AtomicField(name = "refr_domain_userid", _.refr_domain_userid,maxLength = 128 ), + AtomicField(name = "domain_sessionid", _.domain_sessionid, maxLength = 128 ), + AtomicField(name = "event_vendor", _.event_vendor, maxLength = 1000), + AtomicField(name = "event_name", _.event_name, maxLength = 1000), + AtomicField(name = "event_format", _.event_format, maxLength = 128 ), + AtomicField(name = "event_version", _.event_version, maxLength = 128 ), + AtomicField(name = "event_fingerprint", _.event_fingerprint, maxLength = 128 ) + ) + + def validate[F[_]: Monad](event: EnrichedEvent, + rawEvent: RawEvent, + processor: Processor, + acceptInvalid: Boolean, + invalidCount: F[Unit]): F[Either[BadRow, Unit]] = + atomicFields + .map(validateField(event)) + .combineAll + .leftMap(buildBadRow(event, rawEvent, processor)) match { + case Invalid(badRow) if acceptInvalid => + handleAcceptableBadRow(invalidCount, badRow) *> Monad[F].pure(Right(())) + case Invalid(badRow) => + Monad[F].pure(Left(badRow)) + case Valid(()) => + Monad[F].pure(Right(())) + } + + private def validateField(event: EnrichedEvent) + (atomicField: AtomicField): ValidatedNel[String, Unit] = { + val actualValue = atomicField.enrichedValueExtractor(event) + if (actualValue != null && actualValue.length > atomicField.maxLength) //TODO values can be null? + s"Field ${atomicField.name} longer than maximum allowed size ${atomicField.maxLength}".invalidNel + else + Valid(()) + } + + private def buildBadRow(event: EnrichedEvent, + rawEvent: RawEvent, + processor: Processor) + (errors: NonEmptyList[String]): BadRow.EnrichmentFailures = { + EnrichmentManager.buildEnrichmentFailuresBadRow( + NonEmptyList.of( + + //TODO why 2 messages? + asEnrichmentFailure("Enriched event does not conform to atomic schema field's length restrictions"), + asEnrichmentFailure(errors.toList.mkString(",")) + ), + EnrichedEvent.toPartiallyEnrichedEvent(event), + RawEvent.toRawEvent(rawEvent), + processor + ) + } + + private def handleAcceptableBadRow[F[_]: Monad](invalidCount: F[Unit], + badRow: BadRow.EnrichmentFailures): F[Unit] = + invalidCount *> + Monad[F].pure(logger.debug(s"Enriched event not valid against atomic schema. Bad row: ${badRow.compact}")) + + + private def asEnrichmentFailure(errorMessage: String): EnrichmentFailure = { + EnrichmentFailure( + enrichment = None, + FailureDetails.EnrichmentFailureMessage.Simple(errorMessage) + ) + } +} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index f3cdf264b..48179306a 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -17,7 +17,6 @@ import java.nio.charset.Charset import java.net.URI import java.time.Instant import org.joda.time.DateTime -import org.slf4j.LoggerFactory import io.circe.Json import cats.Monad import cats.data.{EitherT, NonEmptyList, OptionT, ValidatedNel} @@ -29,12 +28,11 @@ import com.snowplowanalytics.refererparser._ import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor} -import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure import adapters.RawEvent import enrichments.{EventEnrichments => EE} @@ -49,9 +47,6 @@ import utils.{IgluUtils, ConversionUtils => CU} object EnrichmentManager { - val atomicSchema: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0)) - - private val logger = LoggerFactory.getLogger("InvalidEnriched") /** * Run the enrichment workflow @@ -101,7 +96,7 @@ object EnrichmentManager { enriched.pii = pii.asString } } - _ <- validateEnriched(enriched, raw, processor, client, acceptInvalid, invalidCount) + _ <- validateEnriched(enriched, raw, processor, acceptInvalid, invalidCount) } yield enriched /** @@ -767,63 +762,14 @@ object EnrichmentManager { enriched: EnrichedEvent, raw: RawEvent, processor: Processor, - client: Client[F, Json], acceptInvalid: Boolean, invalidCount: F[Unit] - ): EitherT[F, BadRow, Unit] = - EitherT( - for { - validated <- EnrichedEvent - .toAtomic(enriched) - .leftMap(err => - EnrichmentManager.buildEnrichmentFailuresBadRow( - NonEmptyList( - EnrichmentFailure( - None, - FailureDetails.EnrichmentFailureMessage.Simple( - "Error during conversion of enriched event to the atomic format" - ) - ), - List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.Simple(err.toString))) - ), - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) - ) - .toEitherT[F] - .flatMap(atomic => - client - .check(SelfDescribingData(atomicSchema, atomic)) - .leftMap(err => - EnrichmentManager.buildEnrichmentFailuresBadRow( - NonEmptyList( - EnrichmentFailure( - None, - FailureDetails.EnrichmentFailureMessage.Simple( - s"Enriched event not valid against ${atomicSchema.toSchemaUri}" - ) - ), - List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.IgluError(atomicSchema, err))) - ), - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) - ) - ) - .value - validation <- validated match { - case Left(br) if !acceptInvalid => - Monad[F].pure(Left(br)) - case Left(br) => - for { - _ <- invalidCount - _ <- Monad[F].pure(logger.debug(s"Enriched event not valid against atomic schema. Bad row: ${br.compact}")) - } yield Right(()) - case _ => - Monad[F].pure(Right(())) - } - } yield validation - ) + ): EitherT[F, BadRow, Unit] = EitherT { + + /** + * We're using static field's length validation. + * See more in https://github.com/snowplow/enrich/issues/608 + */ + AtomicFieldsLengthValidator.validate[F](enriched, raw, processor, acceptInvalid, invalidCount) + } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala index d9fc78111..64dc01310 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala @@ -20,16 +20,10 @@ import cats.data.NonEmptyList import io.circe.literal._ import org.joda.time.DateTime import com.snowplowanalytics.snowplow.badrows._ -import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailureMessage import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer} import loaders._ import adapters.RawEvent -import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.{ - JsonMutators, - PiiJson, - PiiPseudonymizerEnrichment, - PiiStrategyPseudonymize -} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.{JsonMutators, PiiJson, PiiPseudonymizerEnrichment, PiiStrategyPseudonymize} import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import utils.Clock._ import utils.ConversionUtils @@ -38,6 +32,7 @@ import org.apache.commons.codec.digest.DigestUtils import org.specs2.mutable.Specification import org.specs2.matcher.EitherMatchers import SpecHelpers._ +import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailureMessage class EnrichmentManagerSpec extends Specification with EitherMatchers { import EnrichmentManagerSpec._ @@ -711,45 +706,42 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers { "validateEnriched" should { "create a bad row if a field is oversized" >> { - EnrichmentManager + val result = EnrichmentManager .enrichEvent[Id]( enrichmentReg, client, processor, timestamp, RawEvent(api, fatBody, None, source, context), - false, + acceptInvalid = false, AcceptInvalid.countInvalid - ) - .swap - .map { - case BadRow.EnrichmentFailures(_, failure, _) => - failure.messages.map(_.message match { - case EnrichmentFailureMessage.Simple(error) => error - case EnrichmentFailureMessage.IgluError(schemaKey, _) => schemaKey - case _ => None - }) - case _ => None - } - .getOrElse(None) === NonEmptyList( - s"Enriched event not valid against ${EnrichmentManager.atomicSchema.toSchemaUri}", - List(EnrichmentManager.atomicSchema) - ) + ).value + + result must beLeft.like { + case badRow: BadRow.EnrichmentFailures => + val firstError = badRow.failure.messages.head.message + val secondError = badRow.failure.messages.last.message + + firstError must beEqualTo(EnrichmentFailureMessage.Simple("Enriched event does not conform to atomic schema field's length restrictions")) + secondError must beEqualTo(EnrichmentFailureMessage.Simple("Field v_tracker longer than maximum allowed size 100")) + case br => + ko(s"bad row [$br] is not BadRow.EnrichmentFailures") + } } "not create a bad row if a field is oversized and acceptInvalid is set to true" >> { - EnrichmentManager + val result = EnrichmentManager .enrichEvent[Id]( enrichmentReg, client, processor, timestamp, RawEvent(api, fatBody, None, source, context), - true, + acceptInvalid = true, AcceptInvalid.countInvalid - ) - .map(_ => true) - .getOrElse(false) must beTrue + ).value + + result must beRight[EnrichedEvent] } } }