Skip to content

Commit

Permalink
Add eventVolume and platform to observed_event (close #795)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jul 19, 2023
1 parent be3d839 commit 3929c49
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, HttpClient,

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{
Cloud,
Concurrency,
FeatureFlags,
RemoteAdapterConfigs,
Expand Down Expand Up @@ -124,7 +125,7 @@ final case class Environment[F[_], A](
processor: Processor,
streamsSettings: Environment.StreamsSettings,
region: Option[String],
cloud: Option[Telemetry.Cloud],
cloud: Option[Cloud],
featureFlags: FeatureFlags
)

Expand Down Expand Up @@ -179,7 +180,7 @@ object Environment {
getPayload: A => Array[Byte],
processor: Processor,
maxRecordSize: Int,
cloud: Option[Telemetry.Cloud],
cloud: Option[Cloud],
getRegion: => Option[String],
featureFlags: FeatureFlags
): Resource[F, Environment[F, A]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import retry.syntax.all._

import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Input, Monitoring, Output, RetryCheckpointing}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Cloud, Input, Monitoring, Output, RetryCheckpointing}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Retries, Source}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client
Expand All @@ -54,7 +54,7 @@ object Run {
mkClients: List[Blocker => Resource[F, Client[F]]],
getPayload: A => Array[Byte],
maxRecordSize: Int,
cloud: Option[Telemetry.Cloud],
cloud: Option[Cloud],
getRegion: => Option[String]
): F[ExitCode] =
CliConfig.command(name, version, description).parse(args) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import fs2.Stream
import org.http4s.client.{Client => HttpClient}

import _root_.io.circe.Json
import _root_.io.circe.Encoder
import _root_.io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
Expand All @@ -36,6 +35,7 @@ import com.snowplowanalytics.snowplow.scalatracker.Emitter.{Result => TrackerRes
import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Telemetry => TelemetryConfig}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud

object Telemetry {

Expand Down Expand Up @@ -119,13 +119,4 @@ object Telemetry {
"applicationVersion" -> appVersion.asJson
)
)

sealed trait Cloud

object Cloud {
case object Aws extends Cloud
case object Gcp extends Cloud

implicit val encoder: Encoder[Cloud] = Encoder.encodeString.contramap[Cloud](_.toString().toUpperCase())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -583,4 +583,14 @@ object io {
implicit val veroSchemasEncoder: Encoder[VeroSchemas] =
deriveConfiguredEncoder[VeroSchemas]
}

sealed trait Cloud

object Cloud {
case object Aws extends Cloud
case object Gcp extends Cloud
case object Azure extends Cloud

implicit val encoder: Encoder[Cloud] = Encoder.encodeString.contramap[Cloud](_.toString().toUpperCase())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import cats.implicits._
import cats.Applicative
import cats.data.NonEmptyList
import cats.kernel.Semigroup
import cats.effect.{Async, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.effect.concurrent.Ref
import fs2.Stream
Expand Down Expand Up @@ -49,7 +50,17 @@ trait Metadata[F[_]] {
}

object Metadata {
type EventsToEntities = Map[MetadataEvent, Set[SchemaKey]]
type Aggregates = Map[MetadataEvent, EntitiesAndCount]
case class EntitiesAndCount(entities: Set[SchemaKey], count: Int)

implicit private def entitiesAndCountSemigroup: Semigroup[EntitiesAndCount] =
new Semigroup[EntitiesAndCount] {
override def combine(x: EntitiesAndCount, y: EntitiesAndCount): EntitiesAndCount =
EntitiesAndCount(
x.entities |+| y.entities,
x.count + y.count
)
}

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]
Expand All @@ -69,7 +80,7 @@ object Metadata {
} yield ()

def observe(events: List[EnrichedEvent]): F[Unit] =
observedRef.eventsToEntities.update(recalculate(_, events))
observedRef.aggregates.update(recalculate(_, events))
}
}

Expand All @@ -82,17 +93,18 @@ object Metadata {
private def submit[F[_]: Sync: Clock](reporter: MetadataReporter[F], ref: MetadataEventsRef[F]): F[Unit] =
for {
snapshot <- MetadataEventsRef.snapshot(ref)
_ <- snapshot.eventsToEntities.keySet.toList
.traverse(reporter.report(snapshot.periodStart, snapshot.periodEnd)(_, snapshot.eventsToEntities))
_ <- snapshot.aggregates.toList.traverse {
case (event, entitiesAndCount) =>
reporter.report(snapshot.periodStart, snapshot.periodEnd, event, entitiesAndCount)
}
} yield ()

trait MetadataReporter[F[_]] {
def report(
periodStart: Instant,
periodEnd: Instant
)(
snapshot: MetadataEvent,
eventsToEntities: EventsToEntities
periodEnd: Instant,
event: MetadataEvent,
entitiesAndCount: EntitiesAndCount
): F[Unit]
}

Expand Down Expand Up @@ -121,16 +133,15 @@ object Metadata {

def report(
periodStart: Instant,
periodEnd: Instant
)(
periodEnd: Instant,
event: MetadataEvent,
eventsToEntities: EventsToEntities
entitiesAndCount: EntitiesAndCount
): F[Unit] =
initTracker(config, appName, client).use { t =>
Logger[F].info(s"Tracking observed event ${event.schema.toSchemaUri}") >>
t.trackSelfDescribingEvent(
mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event),
eventsToEntities.find(_._1 == event).map(_._2).toSeq.flatMap(mkWebhookContexts)
mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event, entitiesAndCount.count),
mkWebhookContexts(entitiesAndCount.entities).toSeq
) >> t.flushEmitters()
}

Expand Down Expand Up @@ -159,11 +170,13 @@ object Metadata {
* @param schema - schema key of given event
* @param source - `app_id` for given event
* @param tracker - `v_tracker` for given event
* @param platform - The platform the app runs on for given event (`platform` field)
*/
case class MetadataEvent(
schema: SchemaKey,
source: Option[String],
tracker: Option[String]
tracker: Option[String],
platform: Option[String]
)
object MetadataEvent {
def apply(event: EnrichedEvent): MetadataEvent =
Expand All @@ -175,46 +188,47 @@ object Metadata {
Option(event.event_version).toRight("unknown-version").flatMap(SchemaVer.parseFull).getOrElse(SchemaVer.Full(0, 0, 0))
),
Option(event.app_id),
Option(event.v_tracker)
Option(event.v_tracker),
Option(event.platform)
)
}

/**
* An representation of observed metadata events and entites attached to them over a period of time
*
* @param eventsToEntities - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s
* @param periodStart - since when `eventsToEntities` are accumulated
* @param periodEnd - until when `eventsToEntities` are accumulated
* @param aggregates - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s
* @param periodStart - since when `aggregates` are accumulated
* @param periodEnd - until when `aggregates` are accumulated
*/
case class MetadataSnapshot(
eventsToEntities: EventsToEntities,
aggregates: Aggregates,
periodStart: Instant,
periodEnd: Instant
)

/**
* Internal state representation for current metadata period
* @param eventsToEntities - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s
* @param periodStart - since when `eventsToEntities` are accumulated
* @param aggregates - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s
* @param periodStart - since when `aggregates` are accumulated
*/
case class MetadataEventsRef[F[_]](
eventsToEntities: Ref[F, EventsToEntities],
aggregates: Ref[F, Aggregates],
periodStart: Ref[F, Instant]
)

object MetadataEventsRef {
def init[F[_]: Sync: Clock]: F[MetadataEventsRef[F]] =
for {
time <- Clock[F].instantNow
eventsToEntities <- Ref.of[F, EventsToEntities](Map.empty)
aggregates <- Ref.of[F, Aggregates](Map.empty)
periodStart <- Ref.of[F, Instant](time)
} yield MetadataEventsRef(eventsToEntities, periodStart)
} yield MetadataEventsRef(aggregates, periodStart)
def snapshot[F[_]: Sync: Clock](ref: MetadataEventsRef[F]): F[MetadataSnapshot] =
for {
periodEnd <- Clock[F].instantNow
eventsToEntities <- ref.eventsToEntities.getAndSet(Map.empty)
aggregates <- ref.aggregates.getAndSet(Map.empty)
periodStart <- ref.periodStart.getAndSet(periodEnd)
} yield MetadataSnapshot(eventsToEntities, periodStart, periodEnd)
} yield MetadataSnapshot(aggregates, periodStart, periodEnd)
}

def unwrapEntities(event: EnrichedEvent): Set[SchemaKey] = {
Expand All @@ -240,18 +254,19 @@ object Metadata {
SchemaVer.parseFull(event.event_version).getOrElse(SchemaVer.Full(0, 0, 0))
)

def recalculate(previous: EventsToEntities, events: List[EnrichedEvent]): EventsToEntities =
previous |+| events.map(e => Map(MetadataEvent(e) -> unwrapEntities(e))).combineAll
def recalculate(previous: Aggregates, events: List[EnrichedEvent]): Aggregates =
previous |+| events.map(e => Map(MetadataEvent(e) -> EntitiesAndCount(unwrapEntities(e), 1))).combineAll

def mkWebhookEvent(
organizationId: UUID,
pipelineId: UUID,
periodStart: Instant,
periodEnd: Instant,
event: MetadataEvent
event: MetadataEvent,
count: Int
): SelfDescribingData[Json] =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(4, 0, 0)),
SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(6, 0, 0)),
Json.obj(
"organizationId" -> organizationId.asJson,
"pipelineId" -> pipelineId.asJson,
Expand All @@ -260,6 +275,8 @@ object Metadata {
"eventVersion" -> event.schema.version.asString.asJson,
"source" -> event.source.getOrElse("unknown-source").asJson,
"tracker" -> event.tracker.getOrElse("unknown-tracker").asJson,
"platform" -> event.platform.getOrElse("unknown-platform").asJson,
"eventVolume" -> Json.fromInt(count),
"periodStart" -> periodStart.asJson,
"periodEnd" -> periodEnd.asJson
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,25 @@ import org.specs2.mutable.Specification
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Metadata => MetadataConfig}
import Metadata.{MetadataEvent, MetadataReporter}
import Metadata.{EntitiesAndCount, MetadataEvent, MetadataReporter}

class MetadataSpec extends Specification with CatsIO {
case class Report(
periodStart: Instant,
periodEnd: Instant,
event: SchemaKey,
entities: Set[SchemaKey]
entitiesAndCount: EntitiesAndCount
)
case class TestReporter[F[_]](state: Ref[F, List[Report]]) extends MetadataReporter[F] {
def report(periodStart: Instant, periodEnd: Instant)(event: MetadataEvent, mappings: Map[MetadataEvent, Set[SchemaKey]]): F[Unit] =

def report(
periodStart: Instant,
periodEnd: Instant,
event: MetadataEvent,
entitiesAndCount: EntitiesAndCount
): F[Unit] =
state.update(
_ :+ Report(periodStart, periodEnd, event.schema, mappings.find(_._1 == event).map(_._2).toSet.flatten)
_ :+ Report(periodStart, periodEnd, event.schema, entitiesAndCount)
)
}

Expand All @@ -65,12 +71,13 @@ class MetadataSpec extends Specification with CatsIO {
res.map(_.event) should containTheSameElementsAs(
List(SchemaKey("unknown-vendor", "unknown-name", "unknown-format", SchemaVer.Full(0, 0, 0)))
)
res.flatMap(_.entities) should containTheSameElementsAs(
res.flatMap(_.entitiesAndCount.entities) should containTheSameElementsAs(
Seq(
SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 0)),
SchemaKey("org.w3", "PerformanceTiming", "jsonschema", SchemaVer.Full(1, 0, 0))
)
)
res.map(_.entitiesAndCount.count) should beEqualTo(List(1))
}
}

Expand All @@ -92,7 +99,7 @@ class MetadataSpec extends Specification with CatsIO {
"add metadata event to empty state" in {
val enriched = MetadataSpec.enriched
Metadata.recalculate(Map.empty, List(enriched)) should containTheSameElementsAs(
Seq((MetadataEvent(enriched) -> Set.empty))
Seq(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty, 1))
)
}

Expand All @@ -101,9 +108,9 @@ class MetadataSpec extends Specification with CatsIO {
val other = MetadataSpec.enriched
val v1_0_1 = SchemaVer.Full(1, 0, 1)
other.event_version = v1_0_1.asString
val previous = Map(MetadataEvent(enriched) -> Set.empty[SchemaKey])
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
Metadata.recalculate(previous, List(other)) should containTheSameElementsAs(
previous.toSeq ++ Seq(MetadataEvent(other) -> Set.empty[SchemaKey])
previous.toSeq ++ Seq(MetadataEvent(other) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
)
}

Expand All @@ -119,9 +126,9 @@ class MetadataSpec extends Specification with CatsIO {
enrichedBis.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-1","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}"""
val entityBis = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 1))
val previous = Map(MetadataEvent(enriched) -> entities)
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1))
Metadata.recalculate(previous, List(enrichedBis)) should containTheSameElementsAs(
Seq(MetadataEvent(enriched) -> (entities + entityBis))
Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis, 2))
)
}

Expand All @@ -141,9 +148,9 @@ class MetadataSpec extends Specification with CatsIO {
enrichedTer.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-2","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}"""
val entityTer = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 2))
val previous = Map(MetadataEvent(enriched) -> entities)
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1))
Metadata.recalculate(previous, List(enrichedBis, enrichedTer)) should containTheSameElementsAs(
Seq(MetadataEvent(enriched) -> (entities + entityBis + entityTer))
Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis + entityTer, 3))
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2.test
import cats.effect.Sync
import cats.effect.concurrent.Ref
import fs2.Stream
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata.MetadataEvent
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata.{EntitiesAndCount, MetadataEvent}

object Aggregates {
def init[F[_]: Sync] = Ref.of[F, Map[MetadataEvent, Set[SchemaKey]]](Map.empty)
object AggregatesSpec {
def init[F[_]: Sync] = Ref.of[F, Map[MetadataEvent, EntitiesAndCount]](Map.empty)

def metadata[F[_]](ref: Ref[F, Map[MetadataEvent, Set[SchemaKey]]]): Metadata[F] =
def metadata[F[_]](ref: Ref[F, Map[MetadataEvent, EntitiesAndCount]]): Metadata[F] =
new Metadata[F] {
def report: Stream[F, Unit] = Stream.empty.covary[F]
def observe(events: List[EnrichedEvent]): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ object TestEnvironment extends CatsIO {
_ <- filesResource(blocker, enrichments.flatMap(_.filesToCache).map(p => Paths.get(p._2)))
counter <- Resource.eval(Counter.make[IO])
metrics = Counter.mkCounterMetrics[IO](counter)(Monad[IO], ioClock)
aggregates <- Resource.eval(Aggregates.init[IO])
metadata = Aggregates.metadata[IO](aggregates)
aggregates <- Resource.eval(AggregatesSpec.init[IO])
metadata = AggregatesSpec.metadata[IO](aggregates)
clients = Clients.init[IO](http, Nil)
sem <- Resource.eval(Semaphore[IO](1L))
assetsState <- Resource.eval(Assets.State.make(blocker, sem, clients, enrichments.flatMap(_.filesToCache)))
Expand Down
Loading

0 comments on commit 3929c49

Please sign in to comment.