Skip to content

Commit

Permalink
common-fs2: Add experimental metadata aggregation sink (close #569)
Browse files Browse the repository at this point in the history
This adds a separate, experimental metadata aggregation that allows exporting
event-entity cluster data. Each predefined period of time we are sending the
observed clusters to a webhook.
  • Loading branch information
peel committed Mar 23, 2022
1 parent 918d5cc commit 59df3f8
Show file tree
Hide file tree
Showing 13 changed files with 607 additions and 18 deletions.
12 changes: 12 additions & 0 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,16 @@
# More details: https://github.com/snowplow/enrich/issues/517#issuecomment-1033910690
"acceptInvalid": false
}

# Optional. Configuration for experimental/preview features
"experimental": {
# Whether to export metadata using a webhook URL.
# Follows iglu-webhook protocol.
"metadata": {
"endpoint": "https://my_pipeline.my_domain.com/iglu"
"interval": 5 minutes
"organizationId": "c5f3a09f-75f8-4309-bec5-fea560f78455"
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}
}
12 changes: 12 additions & 0 deletions config/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,16 @@
# More details: https://github.com/snowplow/enrich/issues/517#issuecomment-1033910690
"acceptInvalid": false
}

# Optional. Configuration for experimental/preview features
"experimental": {
# Whether to export metadata using a webhook URL.
# Follows iglu-webhook protocol.
"metadata": {
"endpoint": "https://my_pipeline.my_domain.com/iglu"
"interval": 5 minutes
"organizationId": "c5f3a09f-75f8-4309-bec5-fea560f78455"
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object Enrich {
_.parEvalMap(env.streamsSettings.concurrency.sink)(sinkChunk(_, sinkOne(env), env.metrics.enrichLatency))
.evalMap(env.checkpoint)

Stream.eval(runWithShutdown(enriched, sinkAndCheckpoint))
Stream.eval(runWithShutdown(enriched, sinkAndCheckpoint, env.preShutdown.getOrElse(() => Sync[F].unit)))
}

/**
Expand Down Expand Up @@ -189,7 +189,11 @@ object Enrich {
serializeEnriched(enriched, env.processor, env.streamsSettings.maxRecordSize) match {
case Left(br) => sinkBad(env, br)
case Right(bytes) =>
List(env.metrics.goodCount, env.sinkGood(AttributedData(bytes, env.goodAttributes(enriched))), sinkPii(env, enriched)).parSequence_
List(env.metrics.goodCount,
env.metadata.observe(enriched),
env.sinkGood(AttributedData(bytes, env.goodAttributes(enriched))),
sinkPii(env, enriched)
).parSequence_
}

def sinkPii[F[_]: Monad, A](env: Environment[F, A], enriched: EnrichedEvent): F[Unit] =
Expand Down Expand Up @@ -252,13 +256,14 @@ object Enrich {
* We use a queue as a level of indirection between the stream of enriched events and the sink + checkpointing.
* When we receive a SIGINT or exception then we terminate the fiber by pushing a `None` to the queue.
*
* The stream is only cancelled after the sink + checkpointing have been allowed to finish cleanly.
* The stream is only canc rrelled after the sink + checkpointing have been allowed to finish cleanly.
* We must not terminate the source any earlier, because this would shutdown the kinesis scheduler too early,
* and then we would not be able to checkpoint the outstanding records.
*/
private def runWithShutdown[F[_]: Concurrent: Sync: Timer, A](
enriched: Stream[F, List[(A, Result)]],
sinkAndCheckpoint: Pipe[F, List[(A, Result)], Unit]
sinkAndCheckpoint: Pipe[F, List[(A, Result)], Unit],
preShutdown: () => F[Unit]
): F[Unit] =
Queue.synchronousNoneTerminated[F, List[(A, Result)]].flatMap { queue =>
queue.dequeue
Expand All @@ -270,18 +275,18 @@ object Enrich {
.bracketCase(_.join) {
case (_, ExitCase.Completed) =>
// The source has completed "naturally", e.g. processed all input files in the directory
Sync[F].unit
preShutdown()
case (fiber, ExitCase.Canceled) =>
// SIGINT received. We wait for the enriched events already in the queue to get sunk and checkpointed
terminateStream(queue, fiber)
preShutdown() *> terminateStream(queue, fiber)
case (fiber, ExitCase.Error(e)) =>
// Runtime exception in the stream of enriched events.
// We wait for the enriched events already in the queue to get sunk and checkpointed.
// We then raise the original exception
Logger[F].error(e)("Unexpected error in enrich") *>
terminateStream(queue, fiber).handleErrorWith { e2 =>
Logger[F].error(e2)("Error when terminating the stream")
} *> Sync[F].raiseError(e)
preShutdown() *> terminateStream(queue, fiber).handleErrorWith { e2 =>
Logger[F].error(e2)("Error when terminating the stream")
} *> Sync[F].raiseError(e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, Pars
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Telemetry => TelemetryConfig}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Clients, Metrics}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata

import scala.concurrent.ExecutionContext
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input.Kinesis
Expand All @@ -72,6 +73,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input.Kinesis
* @param getPayload function that extracts the collector payload bytes from a record
* @param sentry optional sentry client
* @param metrics common counters
* @param metadata metadata aggregations
* @param assetsUpdatePeriod time after which enrich assets should be refresh
* @param goodAttributes fields from an enriched event to use as output message attributes
* @param piiAttributes fields from a PII event to use as output message attributes
Expand Down Expand Up @@ -101,6 +103,7 @@ final case class Environment[F[_], A](
getPayload: A => Array[Byte],
sentry: Option[SentryClient],
metrics: Metrics[F],
metadata: Metadata[F],
assetsUpdatePeriod: Option[FiniteDuration],
goodAttributes: EnrichedEvent => Map[String, String],
piiAttributes: EnrichedEvent => Map[String, String],
Expand All @@ -109,7 +112,8 @@ final case class Environment[F[_], A](
streamsSettings: Environment.StreamsSettings,
region: Option[String],
cloud: Option[Telemetry.Cloud],
acceptInvalid: Boolean
acceptInvalid: Boolean,
preShutdown: Option[() => F[Unit]]
)

object Environment {
Expand Down Expand Up @@ -169,6 +173,7 @@ object Environment {
clts <- clients.map(Clients.init(http, _))
igluClient <- IgluClient.parseDefault[F](parsedConfigs.igluJson).resource
metrics <- Resource.eval(metricsReporter[F](blocker, file))
metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http))
assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache)
sem <- Resource.eval(Semaphore(1L))
assetsState <- Resource.eval(Assets.State.make[F](blocker, sem, clts, assets))
Expand All @@ -189,6 +194,7 @@ object Environment {
getPayload,
sentry,
metrics,
metadata,
file.assetsUpdatePeriod,
parsedConfigs.goodAttributes,
parsedConfigs.piiAttributes,
Expand All @@ -197,7 +203,8 @@ object Environment {
StreamsSettings(file.concurrency, maxRecordSize),
getRegionFromConfig(file).orElse(getRegion),
cloud,
acceptInvalid
acceptInvalid,
Some(() => metadata.submit.compile.drain)
)
}

Expand All @@ -219,6 +226,16 @@ object Environment {
private def metricsReporter[F[_]: ConcurrentEffect: ContextShift: Timer](blocker: Blocker, config: ConfigFile): F[Metrics[F]] =
config.monitoring.flatMap(_.metrics).map(Metrics.build[F](blocker, _)).getOrElse(Metrics.noop[F].pure[F])

private def metadataReporter[F[_]: ConcurrentEffect: ContextShift: Timer](
config: ConfigFile,
appName: String,
httpClient: HttpClient[F]
): F[Metadata[F]] =
config.experimental
.flatMap(_.metadata)
.map(metadataConfig => Metadata.build[F](metadataConfig, Metadata.HttpMetadataReporter[F](metadataConfig, appName, httpClient)))
.getOrElse(Metadata.noop[F].pure[F])

private implicit class EitherTOps[F[_], E: Show, A](eitherT: EitherT[F, E, A]) {
def resource(implicit F: Sync[F]): Resource[F, A] = {
val action: F[A] = eitherT.value.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ object Run {
val updates = Assets.run[F, A](env.blocker, env.semaphore, env.assetsUpdatePeriod, env.assetsState, env.enrichments)
val telemetry = Telemetry.run[F, A](env)
val reporting = env.metrics.report
val flow = enrich.merge(updates).merge(reporting).merge(telemetry)
val metadata = env.metadata.report
val flow = enrich.merge(updates).merge(reporting).merge(telemetry).merge(metadata)
log >> flow.compile.drain.as(ExitCode.Success).recoverWith {
case exception: Throwable =>
Logger[F].error(s"The Enrich job has stopped") >>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@ import pureconfig.ConfigSource
import pureconfig.module.catseffect.syntax._
import pureconfig.module.circe._

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, FeatureFlags, Input, Monitoring, Output, Outputs, Telemetry}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{
Concurrency,
Experimental,
FeatureFlags,
Input,
Monitoring,
Output,
Outputs,
Telemetry
}

/**
* Parsed HOCON configuration file
Expand All @@ -38,6 +47,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency,
* @param monitoring configuration for sentry and metrics
* @param telemetry configuration for telemetry
* @param featureFlags to activate/deactivate enrich features
* @param experimental configuration for experimental features
*/
final case class ConfigFile(
input: Input,
Expand All @@ -46,7 +56,8 @@ final case class ConfigFile(
assetsUpdatePeriod: Option[FiniteDuration],
monitoring: Option[Monitoring],
telemetry: Telemetry,
featureFlags: FeatureFlags
featureFlags: FeatureFlags,
experimental: Option[Experimental]
)

object ConfigFile {
Expand All @@ -57,13 +68,14 @@ object ConfigFile {

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _, _, _) if aup._1 <= 0L =>
case ConfigFile(_, _, _, Some(aup), _, _, _, _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, _, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _, _, _) if s.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, _, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _, _, _, _)
if s.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _) if t.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2.config
import java.nio.file.{InvalidPathException, Path, Paths}
import java.time.Instant
import java.net.URI
import java.util.UUID

import cats.syntax.either._

Expand All @@ -24,6 +25,7 @@ import _root_.io.circe.{Decoder, DecodingFailure, Encoder}
import _root_.io.circe.generic.extras.semiauto._
import _root_.io.circe.config.syntax._
import _root_.io.circe.DecodingFailure
import org.http4s.{ParseFailure, Uri}

object io {

Expand All @@ -44,6 +46,12 @@ object io {
implicit val javaUriEncoder: Encoder[URI] =
Encoder.encodeString.contramap[URI](_.toString)

implicit val http4sUriDecoder: Decoder[Uri] =
Decoder[String].emap(s => Either.catchOnly[ParseFailure](Uri.unsafeFromString(s)).leftMap(_.toString))

implicit val http4sUriEncoder: Encoder[Uri] =
Encoder[String].contramap(_.toString)

import ConfigFile.finiteDurationEncoder

/** Source of raw collector data (only PubSub supported atm) */
Expand Down Expand Up @@ -354,6 +362,27 @@ object io {
deriveConfiguredEncoder[Telemetry]
}

case class Metadata(
endpoint: Uri,
interval: FiniteDuration,
organizationId: UUID,
pipelineId: UUID
)
object Metadata {
implicit val metadataDecoder: Decoder[Metadata] =
deriveConfiguredDecoder[Metadata]
implicit val metadataEncoder: Encoder[Metadata] =
deriveConfiguredEncoder[Metadata]
}

case class Experimental(metadata: Option[Metadata])
object Experimental {
implicit val experimentalDecoder: Decoder[Experimental] =
deriveConfiguredDecoder[Experimental]
implicit val experimentalEncoder: Encoder[Experimental] =
deriveConfiguredEncoder[Experimental]
}

case class FeatureFlags(
acceptInvalid: Boolean
)
Expand Down
Loading

0 comments on commit 59df3f8

Please sign in to comment.