Skip to content

Commit

Permalink
Validate enriched event against atomic schema before emitting (close #…
Browse files Browse the repository at this point in the history
  • Loading branch information
voropaevp authored and benjben committed Feb 1, 2022
1 parent f2d31cd commit 8a8476b
Show file tree
Hide file tree
Showing 78 changed files with 5,651 additions and 522 deletions.
7 changes: 7 additions & 0 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -389,4 +389,11 @@
# Version of the terraform module that deployed the app
moduleVersion = 1.0.0
}

# Optional. To activate/deactive enrich features
"featureFlags" : {
# If activated, enriched events will get validated against atomic schema.
# If not valid, a bad row will be emitted instead of the enriched event.
"validateEnrichedEvents": true
}
}
7 changes: 7 additions & 0 deletions config/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,11 @@
# Version of the terraform module that deployed the app
moduleVersion = 1.0.0
}

# Optional. To activate/deactive enrich features
"featureFlags" : {
# If activated, enriched events will get validated against atomic schema.
# If not valid, a bad row will be emitted instead of the enriched event.
"validateEnrichedEvents": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit

import cats.effect.{ContextShift, IO, Clock, Blocker}

import io.circe.literal._

import fs2.Stream

import com.snowplowanalytics.iglu.client.Client
Expand All @@ -43,10 +45,41 @@ class EnrichBench {

implicit val ioClock: Clock[IO] = Clock.create[IO]

val client = Client.parseDefault[IO](json"""
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 500,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
},
{
"name": "Iglu Central - GCP Mirror",
"priority": 1,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://mirror01.iglucentral.com"
}
}
}
]
}
}
""").rethrowT.unsafeRunSync()

@Benchmark
def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = {
implicit val CS: ContextShift[IO] = state.contextShift
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), state.blocker, Client.IgluCentral, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), client, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
}

@Benchmark
Expand Down Expand Up @@ -83,19 +116,19 @@ object EnrichBench {
raw = EnrichSpec.payload[IO]

val input = Stream.emits(List(
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring
),
)).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ class EtlPipelineBench {

@Benchmark
def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
val payload = EnrichSpec.collectorPayload
EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync()
}

@Benchmark
def measureProcessEventsId(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
val payload = EnrichSpec.collectorPayload
EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ThriftLoaderBench {

@Benchmark
def measureNormalize(state: ThriftLoaderBench.BenchState) = {
Enrich.encodeEvent(state.event)
Enrich.serializeEnriched(state.event)
}
}

Expand All @@ -42,7 +42,7 @@ object ThriftLoaderBench {

@Setup(Level.Trial)
def setup(): Unit = {
data = EnrichSpec.colllectorPayload.toRaw
data = EnrichSpec.collectorPayload.toRaw

event = new EnrichedEvent()
event.setApp_id("foo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object Enrich {
val registry: F[EnrichmentRegistry[F]] = env.enrichments.get.map(_.registry)
val enrich: Enrich[F] = {
implicit val rl: RegistryLookup[F] = env.registryLookup
enrichWith[F](registry, env.igluClient, env.sentry, env.processor)
enrichWith[F](registry, env.igluClient, env.sentry, env.processor, env.validateEnriched)
}

val enriched =
Expand Down Expand Up @@ -101,7 +101,8 @@ object Enrich {
enrichRegistry: F[EnrichmentRegistry[F]],
igluClient: Client[F, Json],
sentry: Option[SentryClient],
processor: Processor
processor: Processor,
validateEnriched: Boolean
)(
row: Array[Byte]
): F[Result] = {
Expand All @@ -113,7 +114,7 @@ object Enrich {
_ <- Logger[F].debug(payloadToString(payload))
etlTstamp <- Clock[F].realTime(TimeUnit.MILLISECONDS).map(millis => new DateTime(millis))
registry <- enrichRegistry
enriched <- EtlPipeline.processEvents[F](adapterRegistry, registry, igluClient, processor, etlTstamp, payload)
enriched <- EtlPipeline.processEvents[F](adapterRegistry, registry, igluClient, processor, etlTstamp, payload, validateEnriched)
} yield (enriched, collectorTstamp)

result.handleErrorWith(sendToSentry[F](row, sentry, processor, collectorTstamp))
Expand Down Expand Up @@ -153,7 +154,7 @@ object Enrich {
): BadRow.GenericError = {
val base64 = new String(Base64.getEncoder.encode(row))
val rawPayload = BadRowPayload.RawPayload(base64)
val failure = Failure.GenericFailure(time, NonEmptyList.one(error.toString))
val failure = Failure.GenericFailure(time, NonEmptyList.one(ConversionUtils.cleanStackTrace(error)))
BadRow.GenericError(processor, failure, rawPayload)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input.Kinesis
* @param streamsSettings parameters used to configure the streams
* @param region region in the cloud where enrich runs
* @param cloud cloud where enrich runs (AWS or GCP)
* @param validateEnriched Whether enriched event should be validated according
* to atomic schema
* @tparam A type emitted by the source (e.g. `ConsumerRecord` for PubSub).
* getPayload must be defined for this type, as well as checkpointing
*/
Expand All @@ -103,7 +105,8 @@ final case class Environment[F[_], A](
processor: Processor,
streamsSettings: Environment.StreamsSettings,
region: Option[String],
cloud: Option[Telemetry.Cloud]
cloud: Option[Telemetry.Cloud],
validateEnriched: Boolean
)

object Environment {
Expand Down Expand Up @@ -147,7 +150,8 @@ object Environment {
processor: Processor,
maxRecordSize: Int,
cloud: Option[Telemetry.Cloud],
getRegion: => Option[String]
getRegion: => Option[String],
validateEnriched: Boolean
): Resource[F, Environment[F, A]] = {
val file = parsedConfigs.configFile
for {
Expand Down Expand Up @@ -189,7 +193,8 @@ object Environment {
processor,
StreamsSettings(file.concurrency, maxRecordSize),
getRegionFromConfig(file).orElse(getRegion),
cloud
cloud,
validateEnriched
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ object Run {
processor,
maxRecordSize,
cloud,
getRegion
getRegion,
file.featureFlags.validateEnrichedEvents
)
runEnvironment[F, Array[Byte]](env)
case _ =>
Expand All @@ -116,7 +117,8 @@ object Run {
processor,
maxRecordSize,
cloud,
getRegion
getRegion,
file.featureFlags.validateEnrichedEvents
)
runEnvironment[F, A](env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import pureconfig.ConfigSource
import pureconfig.module.catseffect.syntax._
import pureconfig.module.circe._

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output, Outputs, Telemetry}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, FeatureFlags, Input, Monitoring, Output, Outputs, Telemetry}

/**
* Parsed HOCON configuration file
Expand All @@ -37,14 +37,16 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency,
* @param assetsUpdatePeriod time after which assets should be updated, in minutes
* @param monitoring configuration for sentry and metrics
* @param telemetry configuration for telemetry
* @param featureFlags to activate/deactivate enrich features
*/
final case class ConfigFile(
input: Input,
output: Outputs,
concurrency: Concurrency,
assetsUpdatePeriod: Option[FiniteDuration],
monitoring: Option[Monitoring],
telemetry: Telemetry
telemetry: Telemetry,
featureFlags: FeatureFlags
)

object ConfigFile {
Expand All @@ -55,13 +57,13 @@ object ConfigFile {

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _, _) if aup._1 <= 0L =>
case ConfigFile(_, _, _, Some(aup), _, _, _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, _, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _, _) if s.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, _, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _, _, _) if s.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _) if t.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,15 @@ object io {
implicit val telemetryEncoder: Encoder[Telemetry] =
deriveConfiguredEncoder[Telemetry]
}

case class FeatureFlags(
validateEnrichedEvents: Boolean
)

object FeatureFlags {
implicit val telemetryDecoder: Decoder[FeatureFlags] =
deriveConfiguredDecoder[FeatureFlags]
implicit val telemetryEncoder: Encoder[FeatureFlags] =
deriveConfiguredEncoder[FeatureFlags]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "com.acme",
"name": "output",
"format": "jsonschema",
"version": "1-0-0"
},
"properties": {
"output": {
"type": "string"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "com.acme",
"name": "test",
"format": "jsonschema",
"version": "1-0-1"
},
"properties": {
"path": {
"properties": {
"id": {
"type": "integer"
}
}
}
}
}
Loading

0 comments on commit 8a8476b

Please sign in to comment.