Skip to content

Commit

Permalink
common: update the validation of the enriched event with static field…
Browse files Browse the repository at this point in the history
… lengths (close #608)
  • Loading branch information
pondzix committed May 26, 2022
1 parent 8b55ec2 commit 4185987
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.Monad
import cats.data.Validated.{Invalid, Valid}
import cats.data.{NonEmptyList, ValidatedNel}
import cats.implicits._
import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure
import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import org.slf4j.LoggerFactory

/**
* Atomic fields length validation inspired by
* https://github.com/snowplow/snowplow-scala-analytics-sdk/blob/master/src/main/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/validate/package.scala
* */
object AtomicFieldsLengthValidator {

final case class AtomicField(name: String,
enrichedValueExtractor: EnrichedEvent => String,
maxLength: Int)

private val logger = LoggerFactory.getLogger("AtomicLengthValidator")

private val atomicFields =
List(
AtomicField(name = "app_id", _.app_id, maxLength = 255 ),
AtomicField(name = "platform", _.platform, maxLength = 255 ),
AtomicField(name = "event", _.event, maxLength = 128 ),
AtomicField(name = "event_id", _.event, maxLength = 36 ),
AtomicField(name = "name_tracker", _.name_tracker, maxLength = 128 ),
AtomicField(name = "v_tracker", _.v_tracker, maxLength = 100 ),
AtomicField(name = "v_collector", _.v_collector, maxLength = 100 ),
AtomicField(name = "v_etl", _.v_etl, maxLength = 100 ),
AtomicField(name = "user_id", _.user_id, maxLength = 255 ),
AtomicField(name = "user_ipaddress", _.user_ipaddress, maxLength = 128 ),
AtomicField(name = "user_fingerprint", _.user_fingerprint, maxLength = 128 ),
AtomicField(name = "domain_userid", _.domain_userid, maxLength = 128 ),
AtomicField(name = "network_userid", _.network_userid, maxLength = 128 ),
AtomicField(name = "geo_country", _.geo_country, maxLength = 2 ),
AtomicField(name = "geo_region", _.geo_region, maxLength = 3 ),
AtomicField(name = "geo_city", _.geo_city, maxLength = 75 ),
AtomicField(name = "geo_zipcode", _.geo_zipcode, maxLength = 15 ),
AtomicField(name = "geo_region_name", _.geo_region_name, maxLength = 100 ),
AtomicField(name = "ip_isp", _.ip_isp, maxLength = 100 ),
AtomicField(name = "ip_organization", _.ip_organization, maxLength = 128 ),
AtomicField(name = "ip_domain", _.ip_domain, maxLength = 128 ),
AtomicField(name = "ip_netspeed", _.ip_netspeed, maxLength = 100 ),
AtomicField(name = "page_url", _.page_url, maxLength = 4096),
AtomicField(name = "page_title", _.page_title, maxLength = 2000),
AtomicField(name = "page_referrer", _.page_referrer, maxLength = 4096),
AtomicField(name = "page_urlscheme", _.page_urlscheme, maxLength = 16 ),
AtomicField(name = "page_urlhost", _.page_urlhost, maxLength = 255 ),
AtomicField(name = "page_urlpath", _.page_urlpath, maxLength = 3000),
AtomicField(name = "page_urlquery", _.page_urlquery, maxLength = 6000),
AtomicField(name = "page_urlfragment", _.page_urlfragment, maxLength = 3000),
AtomicField(name = "refr_urlscheme", _.refr_urlscheme, maxLength = 16 ),
AtomicField(name = "refr_urlhost", _.refr_urlhost, maxLength = 255 ),
AtomicField(name = "refr_urlpath", _.refr_urlpath, maxLength = 6000),
AtomicField(name = "refr_urlquery", _.refr_urlquery, maxLength = 6000),
AtomicField(name = "refr_urlfragment", _.refr_urlfragment, maxLength = 3000),
AtomicField(name = "refr_medium", _.refr_medium, maxLength = 25 ),
AtomicField(name = "refr_source", _.refr_source, maxLength = 50 ),
AtomicField(name = "refr_term", _.refr_term, maxLength = 255 ),
AtomicField(name = "mkt_medium", _.mkt_medium, maxLength = 255 ),
AtomicField(name = "mkt_source", _.mkt_source, maxLength = 255 ),
AtomicField(name = "mkt_term", _.mkt_term, maxLength = 255 ),
AtomicField(name = "mkt_content", _.mkt_content, maxLength = 500 ),
AtomicField(name = "mkt_campaign", _.mkt_campaign, maxLength = 255 ),
AtomicField(name = "se_category", _.se_category, maxLength = 1000),
AtomicField(name = "se_action", _.se_action, maxLength = 1000),
AtomicField(name = "se_label", _.se_label, maxLength = 4096),
AtomicField(name = "se_property", _.se_property, maxLength = 1000),
AtomicField(name = "tr_orderid", _.tr_orderid, maxLength = 255 ),
AtomicField(name = "tr_affiliation", _.tr_affiliation, maxLength = 255 ),
AtomicField(name = "tr_city", _.tr_city, maxLength = 255 ),
AtomicField(name = "tr_state", _.tr_state, maxLength = 255 ),
AtomicField(name = "tr_country", _.tr_country, maxLength = 255 ),
AtomicField(name = "ti_orderid", _.ti_orderid, maxLength = 255 ),
AtomicField(name = "ti_sku", _.ti_sku, maxLength = 255 ),
AtomicField(name = "ti_name", _.ti_name, maxLength = 255 ),
AtomicField(name = "ti_category", _.ti_category, maxLength = 255 ),
AtomicField(name = "useragent", _.useragent, maxLength = 1000),
AtomicField(name = "br_name", _.br_name, maxLength = 50 ),
AtomicField(name = "br_family", _.br_family, maxLength = 50 ),
AtomicField(name = "br_version", _.br_version, maxLength = 50 ),
AtomicField(name = "br_type", _.br_type, maxLength = 50 ),
AtomicField(name = "br_renderengine", _.br_renderengine, maxLength = 50 ),
AtomicField(name = "br_lang", _.br_lang, maxLength = 255 ),
AtomicField(name = "br_colordepth", _.br_colordepth, maxLength = 12 ),
AtomicField(name = "os_name", _.os_name, maxLength = 50 ),
AtomicField(name = "os_family", _.os_family, maxLength = 50 ),
AtomicField(name = "os_manufacturer", _.os_manufacturer, maxLength = 50 ),
AtomicField(name = "os_timezone", _.os_timezone, maxLength = 255 ),
AtomicField(name = "dvce_type", _.dvce_type, maxLength = 50 ),
AtomicField(name = "doc_charset", _.doc_charset, maxLength = 128 ),
AtomicField(name = "tr_currency", _.tr_currency, maxLength = 3 ),
AtomicField(name = "ti_currency", _.ti_currency, maxLength = 3 ),
AtomicField(name = "base_currency", _.base_currency, maxLength = 3 ),
AtomicField(name = "geo_timezone", _.geo_timezone, maxLength = 64 ),
AtomicField(name = "mkt_clickid", _.mkt_clickid, maxLength = 128 ),
AtomicField(name = "mkt_network", _.mkt_network, maxLength = 64 ),
AtomicField(name = "etl_tags", _.etl_tags, maxLength = 500 ),
AtomicField(name = "refr_domain_userid", _.refr_domain_userid,maxLength = 128 ),
AtomicField(name = "domain_sessionid", _.domain_sessionid, maxLength = 128 ),
AtomicField(name = "event_vendor", _.event_vendor, maxLength = 1000),
AtomicField(name = "event_name", _.event_name, maxLength = 1000),
AtomicField(name = "event_format", _.event_format, maxLength = 128 ),
AtomicField(name = "event_version", _.event_version, maxLength = 128 ),
AtomicField(name = "event_fingerprint", _.event_fingerprint, maxLength = 128 )
)

def validate[F[_]: Monad](event: EnrichedEvent,
rawEvent: RawEvent,
processor: Processor,
acceptInvalid: Boolean,
invalidCount: F[Unit]): F[Either[BadRow, Unit]] =
atomicFields
.map(validateField(event))
.combineAll
.leftMap(buildBadRow(event, rawEvent, processor)) match {
case Invalid(badRow) if acceptInvalid =>
handleAcceptableBadRow(invalidCount, badRow) *> Monad[F].pure(Right(()))
case Invalid(badRow) =>
Monad[F].pure(Left(badRow))
case Valid(()) =>
Monad[F].pure(Right(()))
}

private def validateField(event: EnrichedEvent)
(atomicField: AtomicField): ValidatedNel[String, Unit] = {
val actualValue = atomicField.enrichedValueExtractor(event)
if (actualValue != null && actualValue.length > atomicField.maxLength) //TODO values can be null?
s"Field ${atomicField.name} longer than maximum allowed size ${atomicField.maxLength}".invalidNel
else
Valid(())
}

private def buildBadRow(event: EnrichedEvent,
rawEvent: RawEvent,
processor: Processor)
(errors: NonEmptyList[String]): BadRow.EnrichmentFailures = {
EnrichmentManager.buildEnrichmentFailuresBadRow(
NonEmptyList.of(

//TODO why 2 messages?
asEnrichmentFailure("Enriched event does not conform to atomic schema field's length restrictions"),
asEnrichmentFailure(errors.toList.mkString(","))
),
EnrichedEvent.toPartiallyEnrichedEvent(event),
RawEvent.toRawEvent(rawEvent),
processor
)
}

private def handleAcceptableBadRow[F[_]: Monad](invalidCount: F[Unit],
badRow: BadRow.EnrichmentFailures): F[Unit] =
invalidCount *>
Monad[F].pure(logger.debug(s"Enriched event not valid against atomic schema. Bad row: ${badRow.compact}"))


private def asEnrichmentFailure(errorMessage: String): EnrichmentFailure = {
EnrichmentFailure(
enrichment = None,
FailureDetails.EnrichmentFailureMessage.Simple(errorMessage)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.nio.charset.Charset
import java.net.URI
import java.time.Instant
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import io.circe.Json
import cats.Monad
import cats.data.{EitherT, NonEmptyList, OptionT, ValidatedNel}
Expand All @@ -29,12 +28,11 @@ import com.snowplowanalytics.refererparser._
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailure

import adapters.RawEvent
import enrichments.{EventEnrichments => EE}
Expand All @@ -49,9 +47,6 @@ import utils.{IgluUtils, ConversionUtils => CU}

object EnrichmentManager {

val atomicSchema: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0))

private val logger = LoggerFactory.getLogger("InvalidEnriched")

/**
* Run the enrichment workflow
Expand Down Expand Up @@ -101,7 +96,7 @@ object EnrichmentManager {
enriched.pii = pii.asString
}
}
_ <- validateEnriched(enriched, raw, processor, client, acceptInvalid, invalidCount)
_ <- validateEnriched(enriched, raw, processor, acceptInvalid, invalidCount)
} yield enriched

/**
Expand Down Expand Up @@ -767,63 +762,14 @@ object EnrichmentManager {
enriched: EnrichedEvent,
raw: RawEvent,
processor: Processor,
client: Client[F, Json],
acceptInvalid: Boolean,
invalidCount: F[Unit]
): EitherT[F, BadRow, Unit] =
EitherT(
for {
validated <- EnrichedEvent
.toAtomic(enriched)
.leftMap(err =>
EnrichmentManager.buildEnrichmentFailuresBadRow(
NonEmptyList(
EnrichmentFailure(
None,
FailureDetails.EnrichmentFailureMessage.Simple(
"Error during conversion of enriched event to the atomic format"
)
),
List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.Simple(err.toString)))
),
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
)
.toEitherT[F]
.flatMap(atomic =>
client
.check(SelfDescribingData(atomicSchema, atomic))
.leftMap(err =>
EnrichmentManager.buildEnrichmentFailuresBadRow(
NonEmptyList(
EnrichmentFailure(
None,
FailureDetails.EnrichmentFailureMessage.Simple(
s"Enriched event not valid against ${atomicSchema.toSchemaUri}"
)
),
List(EnrichmentFailure(None, FailureDetails.EnrichmentFailureMessage.IgluError(atomicSchema, err)))
),
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
)
)
.value
validation <- validated match {
case Left(br) if !acceptInvalid =>
Monad[F].pure(Left(br))
case Left(br) =>
for {
_ <- invalidCount
_ <- Monad[F].pure(logger.debug(s"Enriched event not valid against atomic schema. Bad row: ${br.compact}"))
} yield Right(())
case _ =>
Monad[F].pure(Right(()))
}
} yield validation
)
): EitherT[F, BadRow, Unit] = EitherT {

/**
* We're using static field's length validation.
* See more in https://github.com/snowplow/enrich/issues/608
*/
AtomicFieldsLengthValidator.validate[F](enriched, raw, processor, acceptInvalid, invalidCount)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@ import cats.data.NonEmptyList
import io.circe.literal._
import org.joda.time.DateTime
import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailureMessage
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer}
import loaders._
import adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.{
JsonMutators,
PiiJson,
PiiPseudonymizerEnrichment,
PiiStrategyPseudonymize
}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.{JsonMutators, PiiJson, PiiPseudonymizerEnrichment, PiiStrategyPseudonymize}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import utils.Clock._
import utils.ConversionUtils
Expand All @@ -38,6 +32,7 @@ import org.apache.commons.codec.digest.DigestUtils
import org.specs2.mutable.Specification
import org.specs2.matcher.EitherMatchers
import SpecHelpers._
import com.snowplowanalytics.snowplow.badrows.FailureDetails.EnrichmentFailureMessage

class EnrichmentManagerSpec extends Specification with EitherMatchers {
import EnrichmentManagerSpec._
Expand Down Expand Up @@ -711,45 +706,42 @@ class EnrichmentManagerSpec extends Specification with EitherMatchers {

"validateEnriched" should {
"create a bad row if a field is oversized" >> {
EnrichmentManager
val result = EnrichmentManager
.enrichEvent[Id](
enrichmentReg,
client,
processor,
timestamp,
RawEvent(api, fatBody, None, source, context),
false,
acceptInvalid = false,
AcceptInvalid.countInvalid
)
.swap
.map {
case BadRow.EnrichmentFailures(_, failure, _) =>
failure.messages.map(_.message match {
case EnrichmentFailureMessage.Simple(error) => error
case EnrichmentFailureMessage.IgluError(schemaKey, _) => schemaKey
case _ => None
})
case _ => None
}
.getOrElse(None) === NonEmptyList(
s"Enriched event not valid against ${EnrichmentManager.atomicSchema.toSchemaUri}",
List(EnrichmentManager.atomicSchema)
)
).value

result must beLeft.like {
case badRow: BadRow.EnrichmentFailures =>
val firstError = badRow.failure.messages.head.message
val secondError = badRow.failure.messages.last.message

firstError must beEqualTo(EnrichmentFailureMessage.Simple("Enriched event does not conform to atomic schema field's length restrictions"))
secondError must beEqualTo(EnrichmentFailureMessage.Simple("Field v_tracker longer than maximum allowed size 100"))
case br =>
ko(s"bad row [$br] is not BadRow.EnrichmentFailures")
}
}

"not create a bad row if a field is oversized and acceptInvalid is set to true" >> {
EnrichmentManager
val result = EnrichmentManager
.enrichEvent[Id](
enrichmentReg,
client,
processor,
timestamp,
RawEvent(api, fatBody, None, source, context),
true,
acceptInvalid = true,
AcceptInvalid.countInvalid
)
.map(_ => true)
.getOrElse(false) must beTrue
).value

result must beRight[EnrichedEvent]
}
}
}
Expand Down

0 comments on commit 4185987

Please sign in to comment.