Skip to content

Commit

Permalink
Enrichment to enforce maximum field lengths (close #517)
Browse files Browse the repository at this point in the history
  • Loading branch information
voropaevp committed Dec 3, 2021
1 parent 0e53d3e commit 1fe5371
Show file tree
Hide file tree
Showing 10 changed files with 983 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit

import cats.effect.{ContextShift, IO, Clock, Blocker}

import io.circe.literal._

import fs2.Stream

import com.snowplowanalytics.iglu.client.Client
Expand All @@ -43,10 +45,41 @@ class EnrichBench {

implicit val ioClock: Clock[IO] = Clock.create[IO]

val client = Client.parseDefault[IO](json"""
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 500,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
},
{
"name": "Iglu Central - GCP Mirror",
"priority": 1,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://mirror01.iglucentral.com"
}
}
}
]
}
}
""").rethrowT.unsafeRunSync()

@Benchmark
def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = {
implicit val CS: ContextShift[IO] = state.contextShift
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), state.blocker, Client.IgluCentral, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), client, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
}

@Benchmark
Expand Down Expand Up @@ -83,19 +116,19 @@ object EnrichBench {
raw = EnrichSpec.payload[IO]

val input = Stream.emits(List(
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring
),
)).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ class EtlPipelineBench {

@Benchmark
def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
val payload = EnrichSpec.collectorPayload
EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync()
}

@Benchmark
def measureProcessEventsId(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
val payload = EnrichSpec.collectorPayload
EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ThriftLoaderBench {

@Benchmark
def measureNormalize(state: ThriftLoaderBench.BenchState) = {
Enrich.encodeEvent(state.event)
Enrich.serializeEnriched(state.event)
}
}

Expand All @@ -42,7 +42,7 @@ object ThriftLoaderBench {

@Setup(Level.Trial)
def setup(): Unit = {
data = EnrichSpec.colllectorPayload.toRaw
data = EnrichSpec.collectorPayload.toRaw

event = new EnrichedEvent()
event.setApp_id("foo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ object EnrichmentManager {
enriched.pii = pii.asString
}
}
_ <- registry.shreddedValidator
.map(_.performCheck(enriched, raw, processor, client))
.getOrElse(EitherT.rightT[F, BadRow](()))
} yield enriched

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ object EnrichmentRegistry {
registry <- er
} yield registry.copy(apiRequest = enrichment.some)
case c: PiiPseudonymizerConf => er.map(_.copy(piiPseudonymizer = c.enrichment.some))
case c: ShreddedValidatorEnrichmentConf => er.map(_.copy(shreddedValidator = c.enrichment.some))
case c: SqlQueryConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](blocker))
Expand Down Expand Up @@ -185,6 +186,8 @@ object EnrichmentRegistry {
IpLookupsEnrichment.parse(enrichmentConfig, schemaKey, localMode).map(_.some)
else if (nm == "anon_ip")
AnonIpEnrichment.parse(enrichmentConfig, schemaKey).map(_.some)
else if (nm == "shredded_validator_config")
ShreddedValidatorEnrichment.parse(enrichmentConfig, schemaKey).map(_.some)
else if (nm == "referer_parser")
RefererParserEnrichment.parse(enrichmentConfig, schemaKey, localMode).map(_.some)
else if (nm == "campaign_attribution")
Expand Down Expand Up @@ -234,21 +237,22 @@ object EnrichmentRegistry {

/** A registry to hold all of our enrichments. */
final case class EnrichmentRegistry[F[_]](
apiRequest: Option[ApiRequestEnrichment[F]] = None,
piiPseudonymizer: Option[PiiPseudonymizerEnrichment] = None,
sqlQuery: Option[SqlQueryEnrichment[F]] = None,
anonIp: Option[AnonIpEnrichment] = None,
campaignAttribution: Option[CampaignAttributionEnrichment] = None,
cookieExtractor: Option[CookieExtractorEnrichment] = None,
currencyConversion: Option[CurrencyConversionEnrichment[F]] = None,
eventFingerprint: Option[EventFingerprintEnrichment] = None,
httpHeaderExtractor: Option[HttpHeaderExtractorEnrichment] = None,
iab: Option[IabEnrichment] = None,
ipLookups: Option[IpLookupsEnrichment[F]] = None,
javascriptScript: Option[JavascriptScriptEnrichment] = None,
refererParser: Option[RefererParserEnrichment] = None,
uaParser: Option[UaParserEnrichment] = None,
userAgentUtils: Option[UserAgentUtilsEnrichment] = None,
weather: Option[WeatherEnrichment[F]] = None,
yauaa: Option[YauaaEnrichment] = None
apiRequest: Option[ApiRequestEnrichment[F]] = None,
piiPseudonymizer: Option[PiiPseudonymizerEnrichment] = None,
shreddedValidator: Option[ShreddedValidatorEnrichment] = Some(ShreddedValidatorEnrichment()),
sqlQuery: Option[SqlQueryEnrichment[F]] = None,
anonIp: Option[AnonIpEnrichment] = None,
campaignAttribution: Option[CampaignAttributionEnrichment] = None,
cookieExtractor: Option[CookieExtractorEnrichment] = None,
currencyConversion: Option[CurrencyConversionEnrichment[F]] = None,
eventFingerprint: Option[EventFingerprintEnrichment] = None,
httpHeaderExtractor: Option[HttpHeaderExtractorEnrichment] = None,
iab: Option[IabEnrichment] = None,
ipLookups: Option[IpLookupsEnrichment[F]] = None,
javascriptScript: Option[JavascriptScriptEnrichment] = None,
refererParser: Option[RefererParserEnrichment] = None,
uaParser: Option[UaParserEnrichment] = None,
userAgentUtils: Option[UserAgentUtilsEnrichment] = None,
weather: Option[WeatherEnrichment[F]] = None,
yauaa: Option[YauaaEnrichment] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ object EnrichmentConf {
def enrichment: UserAgentUtilsEnrichment = UserAgentUtilsEnrichment(schemaKey)
}

final case class ShreddedValidatorEnrichmentConf(schemaKey: SchemaKey, disable: Boolean) extends EnrichmentConf {
def enrichment: ShreddedValidatorEnrichment = ShreddedValidatorEnrichment(disable)
}

final case class WeatherConf(
schemaKey: SchemaKey,
apiHost: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2012-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import cats.Monad
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.effect.Clock
import cats.implicits._
import io.circe.Json
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.badrows.FailureDetails.{EnrichmentFailureMessage, EnrichmentInformation}
import com.snowplowanalytics.snowplow.badrows.{BadRow, FailureDetails, Processor}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent.toRawEvent
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentManager
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ShreddedValidatorEnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent.toPartiallyEnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.IgluUtils.buildSchemaViolationsBadRow

object ShreddedValidatorEnrichment extends ParseableEnrichment {
override val supportedSchema: SchemaCriterion =
SchemaCriterion(
"com.snowplowanalytics.snowplow.enrichments",
"shredded_validator_config",
"jsonschema",
1,
0,
0
)

/**
* Creates an AnonIpEnrichment instance from a Json.
* @param config The anon_ip enrichment JSON
* @param schemaKey provided for the enrichment, must be supported by this enrichment
* @return an AnonIpEnrichment configuration
*/
override def parse(
config: Json,
schemaKey: SchemaKey,
localMode: Boolean = false
): ValidatedNel[String, ShreddedValidatorEnrichmentConf] =
isParseable(config, schemaKey)
.flatMap(_ =>
config.hcursor
.getOrElse[Boolean]("disable")(false)
.leftMap(_.getMessage())
)
.map(disable => ShreddedValidatorEnrichmentConf(schemaKey, disable))
.toValidatedNel
}

case class ShreddedValidatorEnrichment(disable: Boolean = false) extends Enrichment {
def schemaKey: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0))

def performCheck[F[_]](
e: EnrichedEvent,
re: RawEvent,
processor: Processor,
client: Client[F, Json]
)(
implicit
M: Monad[F],
L: RegistryLookup[F],
C: Clock[F]
): EitherT[F, BadRow, Unit] =
if (!disable)
e.toShreddedEvent
.leftMap(err =>
EnrichmentManager.buildEnrichmentFailuresBadRow(
NonEmptyList.one(
FailureDetails.EnrichmentFailure(Some(EnrichmentInformation(schemaKey, identifier = "shredded_validator")),
EnrichmentFailureMessage.Simple(err.getMessage)
)
),
toPartiallyEnrichedEvent(e),
toRawEvent(re),
processor
)
)
.toEitherT[F]
.flatMap(shreded =>
client
.check(
SelfDescribingData(schemaKey, shreded)
)
.leftMap(err =>
buildSchemaViolationsBadRow(
NonEmptyList.one(
FailureDetails.SchemaViolation.IgluError(schemaKey, err)
),
toPartiallyEnrichedEvent(e),
toRawEvent(re),
processor
)
)
)
else
EitherT.rightT(())
}
Loading

0 comments on commit 1fe5371

Please sign in to comment.