diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala new file mode 100644 index 000000000..61d794f62 --- /dev/null +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/AtomicFieldsLengthValidator.scala @@ -0,0 +1,170 @@ +package com.snowplowanalytics.snowplow.enrich.common.enrichments + +import cats.Monad +import cats.data.Validated.{Invalid, Valid} +import cats.data.{NonEmptyList, ValidatedNel} +import cats.implicits._ +import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure +import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor} +import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent +import org.slf4j.LoggerFactory + +/** + * Atomic fields length validation inspired by + * https://github.com/snowplow/snowplow-scala-analytics-sdk/blob/master/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/validate/package.scala + */ +object AtomicFieldsLengthValidator { + + final case class AtomicField( + name: String, + enrichedValueExtractor: EnrichedEvent => String, + maxLength: Int + ) + + private val logger = LoggerFactory.getLogger("AtomicLengthValidator") + + // format: off + private val atomicFields = + List( + AtomicField(name = "app_id", _.app_id, maxLength = 255 ), + AtomicField(name = "platform", _.platform, maxLength = 255 ), + AtomicField(name = "event", _.event, maxLength = 128 ), + AtomicField(name = "event_id", _.event, maxLength = 36 ), + AtomicField(name = "name_tracker", _.name_tracker, maxLength = 128 ), + AtomicField(name = "v_tracker", _.v_tracker, maxLength = 100 ), + AtomicField(name = "v_collector", _.v_collector, maxLength = 100 ), + AtomicField(name = "v_etl", _.v_etl, maxLength = 100 ), + AtomicField(name = "user_id", _.user_id, maxLength = 255 ), + AtomicField(name = "user_ipaddress", _.user_ipaddress, maxLength = 128 ), + AtomicField(name = "user_fingerprint", _.user_fingerprint, maxLength = 128 ), + AtomicField(name = "domain_userid", _.domain_userid, maxLength = 128 ), + AtomicField(name = "network_userid", _.network_userid, maxLength = 128 ), + AtomicField(name = "geo_country", _.geo_country, maxLength = 2 ), + AtomicField(name = "geo_region", _.geo_region, maxLength = 3 ), + AtomicField(name = "geo_city", _.geo_city, maxLength = 75 ), + AtomicField(name = "geo_zipcode", _.geo_zipcode, maxLength = 15 ), + AtomicField(name = "geo_region_name", _.geo_region_name, maxLength = 100 ), + AtomicField(name = "ip_isp", _.ip_isp, maxLength = 100 ), + AtomicField(name = "ip_organization", _.ip_organization, maxLength = 128 ), + AtomicField(name = "ip_domain", _.ip_domain, maxLength = 128 ), + AtomicField(name = "ip_netspeed", _.ip_netspeed, maxLength = 100 ), + AtomicField(name = "page_url", _.page_url, maxLength = 4096), + AtomicField(name = "page_title", _.page_title, maxLength = 2000), + AtomicField(name = "page_referrer", _.page_referrer, maxLength = 4096), + AtomicField(name = "page_urlscheme", _.page_urlscheme, maxLength = 16 ), + AtomicField(name = "page_urlhost", _.page_urlhost, maxLength = 255 ), + AtomicField(name = "page_urlpath", _.page_urlpath, maxLength = 3000), + AtomicField(name = "page_urlquery", _.page_urlquery, maxLength = 6000), + AtomicField(name = "page_urlfragment", _.page_urlfragment, maxLength = 3000), + AtomicField(name = "refr_urlscheme", _.refr_urlscheme, maxLength = 16 ), + AtomicField(name = "refr_urlhost", _.refr_urlhost, maxLength = 255 ), + AtomicField(name = "refr_urlpath", _.refr_urlpath, maxLength = 6000), + AtomicField(name = "refr_urlquery", _.refr_urlquery, maxLength = 6000), + AtomicField(name = "refr_urlfragment", _.refr_urlfragment, maxLength = 3000), + AtomicField(name = "refr_medium", _.refr_medium, maxLength = 25 ), + AtomicField(name = "refr_source", _.refr_source, maxLength = 50 ), + AtomicField(name = "refr_term", _.refr_term, maxLength = 255 ), + AtomicField(name = "mkt_medium", _.mkt_medium, maxLength = 255 ), + AtomicField(name = "mkt_source", _.mkt_source, maxLength = 255 ), + AtomicField(name = "mkt_term", _.mkt_term, maxLength = 255 ), + AtomicField(name = "mkt_content", _.mkt_content, maxLength = 500 ), + AtomicField(name = "mkt_campaign", _.mkt_campaign, maxLength = 255 ), + AtomicField(name = "se_category", _.se_category, maxLength = 1000), + AtomicField(name = "se_action", _.se_action, maxLength = 1000), + AtomicField(name = "se_label", _.se_label, maxLength = 4096), + AtomicField(name = "se_property", _.se_property, maxLength = 1000), + AtomicField(name = "tr_orderid", _.tr_orderid, maxLength = 255 ), + AtomicField(name = "tr_affiliation", _.tr_affiliation, maxLength = 255 ), + AtomicField(name = "tr_city", _.tr_city, maxLength = 255 ), + AtomicField(name = "tr_state", _.tr_state, maxLength = 255 ), + AtomicField(name = "tr_country", _.tr_country, maxLength = 255 ), + AtomicField(name = "ti_orderid", _.ti_orderid, maxLength = 255 ), + AtomicField(name = "ti_sku", _.ti_sku, maxLength = 255 ), + AtomicField(name = "ti_name", _.ti_name, maxLength = 255 ), + AtomicField(name = "ti_category", _.ti_category, maxLength = 255 ), + AtomicField(name = "useragent", _.useragent, maxLength = 1000), + AtomicField(name = "br_name", _.br_name, maxLength = 50 ), + AtomicField(name = "br_family", _.br_family, maxLength = 50 ), + AtomicField(name = "br_version", _.br_version, maxLength = 50 ), + AtomicField(name = "br_type", _.br_type, maxLength = 50 ), + AtomicField(name = "br_renderengine", _.br_renderengine, maxLength = 50 ), + AtomicField(name = "br_lang", _.br_lang, maxLength = 255 ), + AtomicField(name = "br_colordepth", _.br_colordepth, maxLength = 12 ), + AtomicField(name = "os_name", _.os_name, maxLength = 50 ), + AtomicField(name = "os_family", _.os_family, maxLength = 50 ), + AtomicField(name = "os_manufacturer", _.os_manufacturer, maxLength = 50 ), + AtomicField(name = "os_timezone", _.os_timezone, maxLength = 255 ), + AtomicField(name = "dvce_type", _.dvce_type, maxLength = 50 ), + AtomicField(name = "doc_charset", _.doc_charset, maxLength = 128 ), + AtomicField(name = "tr_currency", _.tr_currency, maxLength = 3 ), + AtomicField(name = "ti_currency", _.ti_currency, maxLength = 3 ), + AtomicField(name = "base_currency", _.base_currency, maxLength = 3 ), + AtomicField(name = "geo_timezone", _.geo_timezone, maxLength = 64 ), + AtomicField(name = "mkt_clickid", _.mkt_clickid, maxLength = 128 ), + AtomicField(name = "mkt_network", _.mkt_network, maxLength = 64 ), + AtomicField(name = "etl_tags", _.etl_tags, maxLength = 500 ), + AtomicField(name = "refr_domain_userid", _.refr_domain_userid,maxLength = 128 ), + AtomicField(name = "domain_sessionid", _.domain_sessionid, maxLength = 128 ), + AtomicField(name = "event_vendor", _.event_vendor, maxLength = 1000), + AtomicField(name = "event_name", _.event_name, maxLength = 1000), + AtomicField(name = "event_format", _.event_format, maxLength = 128 ), + AtomicField(name = "event_version", _.event_version, maxLength = 128 ), + AtomicField(name = "event_fingerprint", _.event_fingerprint, maxLength = 128 ) + ) + // format: on + + def validate[F[_]: Monad]( + event: EnrichedEvent, + rawEvent: RawEvent, + processor: Processor, + acceptInvalid: Boolean, + invalidCount: F[Unit] + ): F[Either[BadRow, Unit]] = + atomicFields + .map(validateField(event)) + .combineAll + .leftMap(buildBadRow(event, rawEvent, processor)) match { + case Invalid(badRow) if acceptInvalid => + handleAcceptableBadRow(invalidCount, badRow) *> Monad[F].pure(Right(())) + case Invalid(badRow) => + Monad[F].pure(Left(badRow)) + case Valid(()) => + Monad[F].pure(Right(())) + } + + private def validateField(event: EnrichedEvent)(atomicField: AtomicField): ValidatedNel[String, Unit] = { + val actualValue = atomicField.enrichedValueExtractor(event) + if (actualValue != null && actualValue.length > atomicField.maxLength) + s"Field ${atomicField.name} longer than maximum allowed size ${atomicField.maxLength}".invalidNel + else + Valid(()) + } + + private def buildBadRow( + event: EnrichedEvent, + rawEvent: RawEvent, + processor: Processor + )( + errors: NonEmptyList[String] + ): BadRow.EnrichmentFailures = + EnrichmentManager.buildEnrichmentFailuresBadRow( + NonEmptyList( + asEnrichmentFailure("Enriched event does not conform to atomic schema field's length restrictions"), + errors.toList.map(asEnrichmentFailure) + ), + EnrichedEvent.toPartiallyEnrichedEvent(event), + RawEvent.toRawEvent(rawEvent), + processor + ) + + private def handleAcceptableBadRow[F[_]: Monad](invalidCount: F[Unit], badRow: BadRow.EnrichmentFailures): F[Unit] = + invalidCount *> + Monad[F].pure(logger.debug(s"Enriched event not valid against atomic schema. Bad row: ${badRow.compact}")) + + private def asEnrichmentFailure(errorMessage: String): EnrichmentFailure = + EnrichmentFailure( + enrichment = None, + FailureDetails.EnrichmentFailureMessage.Simple(errorMessage) + ) +} diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala index f3cdf264b..254e07ee4 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala @@ -17,7 +17,6 @@ import java.nio.charset.Charset import java.net.URI import java.time.Instant import org.joda.time.DateTime -import org.slf4j.LoggerFactory import io.circe.Json import cats.Monad import cats.data.{EitherT, NonEmptyList, OptionT, ValidatedNel} @@ -29,12 +28,11 @@ import com.snowplowanalytics.refererparser._ import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.badrows._ import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor} -import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure import adapters.RawEvent import enrichments.{EventEnrichments => EE} @@ -49,10 +47,6 @@ import utils.{IgluUtils, ConversionUtils => CU} object EnrichmentManager { - val atomicSchema: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0)) - - private val logger = LoggerFactory.getLogger("InvalidEnriched") - /** * Run the enrichment workflow * @param registry Contain configuration for all enrichments to apply @@ -101,7 +95,7 @@ object EnrichmentManager { enriched.pii = pii.asString } } - _ <- validateEnriched(enriched, raw, processor, client, acceptInvalid, invalidCount) + _ <- validateEnriched(enriched, raw, processor, acceptInvalid, invalidCount) } yield enriched /** @@ -767,63 +761,12 @@ object EnrichmentManager { enriched: EnrichedEvent, raw: RawEvent, processor: Processor, - client: Client[F, Json], acceptInvalid: Boolean, invalidCount: F[Unit] ): EitherT[F, BadRow, Unit] = - EitherT( - for { - validated <- EnrichedEvent - .toAtomic(enriched) - .leftMap(err => - EnrichmentManager.buildEnrichmentFailuresBadRow( - NonEmptyList( - EnrichmentFailure( - None, - FailureDetails.EnrichmentFailureMessage.Simple( - "Error during conversion of enriched event to the atomic format" - ) - ), - List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.Simple(err.toString))) - ), - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) - ) - .toEitherT[F] - .flatMap(atomic => - client - .check(SelfDescribingData(atomicSchema, atomic)) - .leftMap(err => - EnrichmentManager.buildEnrichmentFailuresBadRow( - NonEmptyList( - EnrichmentFailure( - None, - FailureDetails.EnrichmentFailureMessage.Simple( - s"Enriched event not valid against ${atomicSchema.toSchemaUri}" - ) - ), - List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.IgluError(atomicSchema, err))) - ), - EnrichedEvent.toPartiallyEnrichedEvent(enriched), - RawEvent.toRawEvent(raw), - processor - ) - ) - ) - .value - validation <- validated match { - case Left(br) if !acceptInvalid => - Monad[F].pure(Left(br)) - case Left(br) => - for { - _ <- invalidCount - _ <- Monad[F].pure(logger.debug(s"Enriched event not valid against atomic schema. Bad row: ${br.compact}")) - } yield Right(()) - case _ => - Monad[F].pure(Right(())) - } - } yield validation - ) + EitherT { + + //We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608 + AtomicFieldsLengthValidator.validate[F](enriched, raw, processor, acceptInvalid, invalidCount) + } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala index d9fc78111..47662ede4 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/EnrichmentManagerSpec.scala @@ -20,7 +20,6 @@ import cats.data.NonEmptyList import io.circe.literal._ import org.joda.time.DateTime import com.snowplowanalytics.snowplow.badrows._ -import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailureMessage import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer} import loaders._ import adapters.RawEvent @@ -38,6 +37,7 @@ import org.apache.commons.codec.digest.DigestUtils import org.specs2.mutable.Specification import org.specs2.matcher.EitherMatchers import SpecHelpers._ +import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailureMessage class EnrichmentManagerSpec extends Specification with EitherMatchers { import EnrichmentManagerSpec._ @@ -711,45 +711,46 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers { "validateEnriched" should { "create a bad row if a field is oversized" >> { - EnrichmentManager + val result = EnrichmentManager .enrichEvent[Id]( enrichmentReg, client, processor, timestamp, RawEvent(api, fatBody, None, source, context), - false, + acceptInvalid = false, AcceptInvalid.countInvalid ) - .swap - .map { - case BadRow.EnrichmentFailures(_, failure, _) => - failure.messages.map(_.message match { - case EnrichmentFailureMessage.Simple(error) => error - case EnrichmentFailureMessage.IgluError(schemaKey, _) => schemaKey - case _ => None - }) - case _ => None - } - .getOrElse(None) === NonEmptyList( - s"Enriched event not valid against ${EnrichmentManager.atomicSchema.toSchemaUri}", - List(EnrichmentManager.atomicSchema) - ) + .value + + result must beLeft.like { + case badRow: BadRow.EnrichmentFailures => + val firstError = badRow.failure.messages.head.message + val secondError = badRow.failure.messages.last.message + + firstError must beEqualTo( + EnrichmentFailureMessage.Simple("Enriched event does not conform to atomic schema field's length restrictions") + ) + secondError must beEqualTo(EnrichmentFailureMessage.Simple("Field v_tracker longer than maximum allowed size 100")) + case br => + ko(s"bad row [$br] is not BadRow.EnrichmentFailures") + } } "not create a bad row if a field is oversized and acceptInvalid is set to true" >> { - EnrichmentManager + val result = EnrichmentManager .enrichEvent[Id]( enrichmentReg, client, processor, timestamp, RawEvent(api, fatBody, None, source, context), - true, + acceptInvalid = true, AcceptInvalid.countInvalid ) - .map(_ => true) - .getOrElse(false) must beTrue + .value + + result must beRight[EnrichedEvent] } } }