From 2b04f8da3c7b70ba0808987f7b875627ec2617e9 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 12 Jul 2023 14:01:01 +0200 Subject: [PATCH] Add eventVolume and platform to observed_event (close #795) --- .../enrich/common/fs2/Environment.scala | 12 ++-- .../snowplow/enrich/common/fs2/Run.scala | 4 +- .../enrich/common/fs2/Telemetry.scala | 11 +-- .../enrich/common/fs2/config/io.scala | 10 +++ .../common/fs2/io/experimental/Metadata.scala | 72 +++++++++++-------- .../enrich/common/fs2/io/MetadataSpec.scala | 26 +++---- ...{Aggregates.scala => AggregatesSpec.scala} | 9 ++- .../common/fs2/test/TestEnvironment.scala | 4 +- .../Main.scala | 3 +- .../snowplow/enrich/kinesis/KinesisRun.scala | 4 +- .../snowplow/enrich/pubsub/Main.scala | 4 +- .../snowplow/enrich/rabbitmq/Main.scala | 3 +- 12 files changed, 90 insertions(+), 72 deletions(-) rename modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/{Aggregates.scala => AggregatesSpec.scala} (82%) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index c32efd30f..50f471e6f 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -43,6 +43,7 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, HttpClient, import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, ParsedConfigs} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{ + Cloud, Concurrency, FeatureFlags, RemoteAdapterConfigs, @@ -124,7 +125,7 @@ final case class Environment[F[_], A]( processor: Processor, streamsSettings: Environment.StreamsSettings, region: Option[String], - cloud: Option[Telemetry.Cloud], + cloud: Option[Cloud], featureFlags: FeatureFlags ) @@ -179,7 +180,7 @@ object Environment { getPayload: A => Array[Byte], processor: Processor, maxRecordSize: Int, - cloud: Option[Telemetry.Cloud], + cloud: Option[Cloud], getRegion: => Option[String], featureFlags: FeatureFlags ): Resource[F, Environment[F, A]] = { @@ -195,7 +196,7 @@ object Environment { igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty metrics <- Resource.eval(Metrics.build[F](blocker, file.monitoring.metrics, remoteAdaptersEnabled)) - metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http)) + metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http, cloud)) assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache) (remoteAdaptersHttpClient, remoteAdapters) <- prepareRemoteAdapters[F](file.remoteAdapters, ec, metrics) adapterRegistry = new AdapterRegistry(remoteAdapters) @@ -258,11 +259,12 @@ object Environment { private def metadataReporter[F[_]: ConcurrentEffect: ContextShift: Timer]( config: ConfigFile, appName: String, - httpClient: Http4sClient[F] + httpClient: Http4sClient[F], + cloud: Option[Cloud] ): F[Metadata[F]] = config.experimental .flatMap(_.metadata) - .map(metadataConfig => Metadata.build[F](metadataConfig, Metadata.HttpMetadataReporter[F](metadataConfig, appName, httpClient))) + .map(metadataConfig => Metadata.build[F](metadataConfig, Metadata.HttpMetadataReporter[F](metadataConfig, appName, httpClient, cloud))) .getOrElse(Metadata.noop[F].pure[F]) private implicit class EitherTOps[F[_], E: Show, A](eitherT: EitherT[F, E, A]) { diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index 7cefda86b..241c177bc 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -28,7 +28,7 @@ import retry.syntax.all._ import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Input, Monitoring, Output, RetryCheckpointing} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Cloud, Input, Monitoring, Output, RetryCheckpointing} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs} import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Retries, Source} import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client @@ -54,7 +54,7 @@ object Run { mkClients: List[Blocker => Resource[F, Client[F]]], getPayload: A => Array[Byte], maxRecordSize: Int, - cloud: Option[Telemetry.Cloud], + cloud: Option[Cloud], getRegion: => Option[String] ): F[ExitCode] = CliConfig.command(name, version, description).parse(args) match { diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Telemetry.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Telemetry.scala index 2cee8eda6..1f2b49434 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Telemetry.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Telemetry.scala @@ -25,7 +25,6 @@ import fs2.Stream import org.http4s.client.{Client => HttpClient} import _root_.io.circe.Json -import _root_.io.circe.Encoder import _root_.io.circe.syntax._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} @@ -36,6 +35,7 @@ import com.snowplowanalytics.snowplow.scalatracker.Emitter.{Result => TrackerRes import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Telemetry => TelemetryConfig} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud object Telemetry { @@ -119,13 +119,4 @@ object Telemetry { "applicationVersion" -> appVersion.asJson ) ) - - sealed trait Cloud - - object Cloud { - case object Aws extends Cloud - case object Gcp extends Cloud - - implicit val encoder: Encoder[Cloud] = Encoder.encodeString.contramap[Cloud](_.toString().toUpperCase()) - } } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index c89169f7b..ad553a0c6 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -512,4 +512,14 @@ object io { legacyEnrichmentOrder = ff.legacyEnrichmentOrder ) } + + sealed trait Cloud + + object Cloud { + case object Aws extends Cloud + case object Gcp extends Cloud + case object Azure extends Cloud + + implicit val encoder: Encoder[Cloud] = Encoder.encodeString.contramap[Cloud](_.toString().toUpperCase()) + } } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala index 796b12880..1a9cea3f0 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala @@ -21,6 +21,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import cats.implicits._ import cats.Applicative import cats.data.NonEmptyList +import cats.kernel.Semigroup import cats.effect.{Async, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer} import cats.effect.concurrent.Ref import fs2.Stream @@ -34,7 +35,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.snowplow.scalatracker.{Emitter, Tracker} import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Metadata => MetadataConfig} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Cloud, Metadata => MetadataConfig} import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent /** @@ -49,7 +50,16 @@ trait Metadata[F[_]] { } object Metadata { - type EventsToEntities = Map[MetadataEvent, Set[SchemaKey]] + type Aggregates = Map[MetadataEvent, EntitiesAndCount] + case class EntitiesAndCount(entities: Set[SchemaKey], count: Int) + + implicit private def entitiesAndCountSemigroup: Semigroup[EntitiesAndCount] = new Semigroup[EntitiesAndCount] { + override def combine(x: EntitiesAndCount, y: EntitiesAndCount): EntitiesAndCount = + EntitiesAndCount( + x.entities |+| y.entities, + x.count + y.count + ) + } private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] @@ -69,7 +79,7 @@ object Metadata { } yield () def observe(events: List[EnrichedEvent]): F[Unit] = - observedRef.eventsToEntities.update(recalculate(_, events)) + observedRef.aggregates.update(recalculate(_, events)) } } @@ -82,24 +92,25 @@ object Metadata { private def submit[F[_]: Sync: Clock](reporter: MetadataReporter[F], ref: MetadataEventsRef[F]): F[Unit] = for { snapshot <- MetadataEventsRef.snapshot(ref) - _ <- snapshot.eventsToEntities.keySet.toList - .traverse(reporter.report(snapshot.periodStart, snapshot.periodEnd)(_, snapshot.eventsToEntities)) + _ <- snapshot.aggregates.toList.traverse { case (event, entitiesAndCount) => + reporter.report(snapshot.periodStart, snapshot.periodEnd, event, entitiesAndCount) + } } yield () trait MetadataReporter[F[_]] { def report( periodStart: Instant, - periodEnd: Instant - )( - snapshot: MetadataEvent, - eventsToEntities: EventsToEntities + periodEnd: Instant, + event: MetadataEvent, + entitiesAndCount: EntitiesAndCount ): F[Unit] } case class HttpMetadataReporter[F[_]: ConcurrentEffect: Timer]( config: MetadataConfig, appName: String, - client: Client[F] + client: Client[F], + cloud: Option[Cloud] ) extends MetadataReporter[F] { def initTracker( config: MetadataConfig, @@ -121,16 +132,15 @@ object Metadata { def report( periodStart: Instant, - periodEnd: Instant - )( + periodEnd: Instant, event: MetadataEvent, - eventsToEntities: EventsToEntities + entitiesAndCount: EntitiesAndCount ): F[Unit] = initTracker(config, appName, client).use { t => Logger[F].info(s"Tracking observed event ${event.schema.toSchemaUri}") >> t.trackSelfDescribingEvent( - mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event), - eventsToEntities.find(_._1 == event).map(_._2).toSeq.flatMap(mkWebhookContexts) + mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event, cloud, entitiesAndCount.count), + mkWebhookContexts(entitiesAndCount.entities).toSeq ) >> t.flushEmitters() } @@ -182,23 +192,23 @@ object Metadata { /** * An representation of observed metadata events and entites attached to them over a period of time * - * @param eventsToEntities - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s - * @param periodStart - since when `eventsToEntities` are accumulated - * @param periodEnd - until when `eventsToEntities` are accumulated + * @param aggregates - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s + * @param periodStart - since when `aggregates` are accumulated + * @param periodEnd - until when `aggregates` are accumulated */ case class MetadataSnapshot( - eventsToEntities: EventsToEntities, + aggregates: Aggregates, periodStart: Instant, periodEnd: Instant ) /** * Internal state representation for current metadata period - * @param eventsToEntities - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s - * @param periodStart - since when `eventsToEntities` are accumulated + * @param aggregates - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s + * @param periodStart - since when `aggregates` are accumulated */ case class MetadataEventsRef[F[_]]( - eventsToEntities: Ref[F, EventsToEntities], + aggregates: Ref[F, Aggregates], periodStart: Ref[F, Instant] ) @@ -206,15 +216,15 @@ object Metadata { def init[F[_]: Sync: Clock]: F[MetadataEventsRef[F]] = for { time <- Clock[F].instantNow - eventsToEntities <- Ref.of[F, EventsToEntities](Map.empty) + aggregates <- Ref.of[F, Aggregates](Map.empty) periodStart <- Ref.of[F, Instant](time) - } yield MetadataEventsRef(eventsToEntities, periodStart) + } yield MetadataEventsRef(aggregates, periodStart) def snapshot[F[_]: Sync: Clock](ref: MetadataEventsRef[F]): F[MetadataSnapshot] = for { periodEnd <- Clock[F].instantNow - eventsToEntities <- ref.eventsToEntities.getAndSet(Map.empty) + aggregates <- ref.aggregates.getAndSet(Map.empty) periodStart <- ref.periodStart.getAndSet(periodEnd) - } yield MetadataSnapshot(eventsToEntities, periodStart, periodEnd) + } yield MetadataSnapshot(aggregates, periodStart, periodEnd) } def unwrapEntities(event: EnrichedEvent): Set[SchemaKey] = { @@ -240,15 +250,17 @@ object Metadata { SchemaVer.parseFull(event.event_version).getOrElse(SchemaVer.Full(0, 0, 0)) ) - def recalculate(previous: EventsToEntities, events: List[EnrichedEvent]): EventsToEntities = - previous |+| events.map(e => Map(MetadataEvent(e) -> unwrapEntities(e))).combineAll + def recalculate(previous: Aggregates, events: List[EnrichedEvent]): Aggregates = + previous |+| events.map(e => Map(MetadataEvent(e) -> EntitiesAndCount(unwrapEntities(e), 1))).combineAll def mkWebhookEvent( organizationId: UUID, pipelineId: UUID, periodStart: Instant, periodEnd: Instant, - event: MetadataEvent + event: MetadataEvent, + platform: Option[Cloud], + count: Int ): SelfDescribingData[Json] = SelfDescribingData( SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(4, 0, 0)), @@ -260,6 +272,8 @@ object Metadata { "eventVersion" -> event.schema.version.asString.asJson, "source" -> event.source.getOrElse("unknown-source").asJson, "tracker" -> event.tracker.getOrElse("unknown-tracker").asJson, + "platform" -> platform.map(_.asJson).getOrElse(Json.fromString("unknown")), + "eventVolume" -> Json.fromInt(count), "periodStart" -> periodStart.asJson, "periodEnd" -> periodEnd.asJson ) diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala index 450ff6000..e872974fb 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala @@ -27,19 +27,20 @@ import org.specs2.mutable.Specification import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Metadata => MetadataConfig} -import Metadata.{MetadataEvent, MetadataReporter} +import Metadata.{EntitiesAndCount, MetadataEvent, MetadataReporter} class MetadataSpec extends Specification with CatsIO { case class Report( periodStart: Instant, periodEnd: Instant, event: SchemaKey, - entities: Set[SchemaKey] + entitiesAndCount: EntitiesAndCount ) case class TestReporter[F[_]](state: Ref[F, List[Report]]) extends MetadataReporter[F] { - def report(periodStart: Instant, periodEnd: Instant)(event: MetadataEvent, mappings: Map[MetadataEvent, Set[SchemaKey]]): F[Unit] = + + def report(periodStart: Instant, periodEnd: Instant, event: MetadataEvent, entitiesAndCount: EntitiesAndCount): F[Unit] = state.update( - _ :+ Report(periodStart, periodEnd, event.schema, mappings.find(_._1 == event).map(_._2).toSet.flatten) + _ :+ Report(periodStart, periodEnd, event.schema, entitiesAndCount) ) } @@ -65,12 +66,13 @@ class MetadataSpec extends Specification with CatsIO { res.map(_.event) should containTheSameElementsAs( List(SchemaKey("unknown-vendor", "unknown-name", "unknown-format", SchemaVer.Full(0, 0, 0))) ) - res.flatMap(_.entities) should containTheSameElementsAs( + res.flatMap(_.entitiesAndCount.entities) should containTheSameElementsAs( Seq( SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 0)), SchemaKey("org.w3", "PerformanceTiming", "jsonschema", SchemaVer.Full(1, 0, 0)) ) ) + res.map(_.entitiesAndCount.count) should beEqualTo(List(1)) } } @@ -92,7 +94,7 @@ class MetadataSpec extends Specification with CatsIO { "add metadata event to empty state" in { val enriched = MetadataSpec.enriched Metadata.recalculate(Map.empty, List(enriched)) should containTheSameElementsAs( - Seq((MetadataEvent(enriched) -> Set.empty)) + Seq(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty, 1)) ) } @@ -101,9 +103,9 @@ class MetadataSpec extends Specification with CatsIO { val other = MetadataSpec.enriched val v1_0_1 = SchemaVer.Full(1, 0, 1) other.event_version = v1_0_1.asString - val previous = Map(MetadataEvent(enriched) -> Set.empty[SchemaKey]) + val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty[SchemaKey], 1)) Metadata.recalculate(previous, List(other)) should containTheSameElementsAs( - previous.toSeq ++ Seq(MetadataEvent(other) -> Set.empty[SchemaKey]) + previous.toSeq ++ Seq(MetadataEvent(other) -> EntitiesAndCount(Set.empty[SchemaKey], 1)) ) } @@ -119,9 +121,9 @@ class MetadataSpec extends Specification with CatsIO { enrichedBis.contexts = """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-1","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}""" val entityBis = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 1)) - val previous = Map(MetadataEvent(enriched) -> entities) + val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1)) Metadata.recalculate(previous, List(enrichedBis)) should containTheSameElementsAs( - Seq(MetadataEvent(enriched) -> (entities + entityBis)) + Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis, 2)) ) } @@ -141,9 +143,9 @@ class MetadataSpec extends Specification with CatsIO { enrichedTer.contexts = """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-2","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}""" val entityTer = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 2)) - val previous = Map(MetadataEvent(enriched) -> entities) + val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1)) Metadata.recalculate(previous, List(enrichedBis, enrichedTer)) should containTheSameElementsAs( - Seq(MetadataEvent(enriched) -> (entities + entityBis + entityTer)) + Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis + entityTer, 3)) ) } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Aggregates.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/AggregatesSpec.scala similarity index 82% rename from modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Aggregates.scala rename to modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/AggregatesSpec.scala index 6e6384a55..1ebe1521d 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Aggregates.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/AggregatesSpec.scala @@ -15,15 +15,14 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2.test import cats.effect.Sync import cats.effect.concurrent.Ref import fs2.Stream -import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata -import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata.MetadataEvent +import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata.{EntitiesAndCount, MetadataEvent} -object Aggregates { - def init[F[_]: Sync] = Ref.of[F, Map[MetadataEvent, Set[SchemaKey]]](Map.empty) +object AggregatesSpec { + def init[F[_]: Sync] = Ref.of[F, Map[MetadataEvent, EntitiesAndCount]](Map.empty) - def metadata[F[_]](ref: Ref[F, Map[MetadataEvent, Set[SchemaKey]]]): Metadata[F] = + def metadata[F[_]](ref: Ref[F, Map[MetadataEvent, EntitiesAndCount]]): Metadata[F] = new Metadata[F] { def report: Stream[F, Unit] = Stream.empty.covary[F] def observe(events: List[EnrichedEvent]): F[Unit] = diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala index cdad0618b..ff3529e5b 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala @@ -132,8 +132,8 @@ object TestEnvironment extends CatsIO { _ <- filesResource(blocker, enrichments.flatMap(_.filesToCache).map(p => Paths.get(p._2))) counter <- Resource.eval(Counter.make[IO]) metrics = Counter.mkCounterMetrics[IO](counter)(Monad[IO], ioClock) - aggregates <- Resource.eval(Aggregates.init[IO]) - metadata = Aggregates.metadata[IO](aggregates) + aggregates <- Resource.eval(AggregatesSpec.init[IO]) + metadata = AggregatesSpec.metadata[IO](aggregates) clients = Clients.init[IO](http, Nil) sem <- Resource.eval(Semaphore[IO](1L)) assetsState <- Resource.eval(Assets.State.make(blocker, sem, clients, enrichments.flatMap(_.filesToCache))) diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala index 29e0a5de0..efadff774 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala @@ -19,6 +19,7 @@ import cats.{Applicative, Parallel} import cats.implicits._ import cats.effect.{ExitCode, IO, IOApp, Resource, SyncIO} import fs2.kafka.CommittableConsumerRecord +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud import com.snowplowanalytics.snowplow.enrich.common.fs2.Run import com.snowplowanalytics.snowplow.enrich.kafka.generated.BuildInfo @@ -60,7 +61,7 @@ object Main extends IOApp.WithContext { List.empty, _.record.value, MaxRecordSize, - None, + Some(Cloud.Azure), None ) diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala index 21a338883..6ee094b1c 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala @@ -27,8 +27,8 @@ import fs2.aws.kinesis.CommittableRecord import software.amazon.kinesis.exceptions.ShutdownException +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud import com.snowplowanalytics.snowplow.enrich.common.fs2.Run -import com.snowplowanalytics.snowplow.enrich.common.fs2.Telemetry import com.snowplowanalytics.snowplow.enrich.kinesis.generated.BuildInfo @@ -56,7 +56,7 @@ object KinesisRun { List(_ => S3Client.mk[F]), getPayload, MaxRecordSize, - Some(Telemetry.Cloud.Aws), + Some(Cloud.Aws), getRuntimeRegion ) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala index 3795525e2..09adbca4c 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala @@ -23,8 +23,8 @@ import scala.concurrent.ExecutionContext import com.permutive.pubsub.consumer.ConsumerRecord +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud import com.snowplowanalytics.snowplow.enrich.common.fs2.Run -import com.snowplowanalytics.snowplow.enrich.common.fs2.Telemetry import com.snowplowanalytics.snowplow.enrich.pubsub.generated.BuildInfo @@ -71,7 +71,7 @@ object Main extends IOApp.WithContext { List(b => Resource.eval(GcsClient.mk[IO](b))), _.value, MaxRecordSize, - Some(Telemetry.Cloud.Gcp), + Some(Cloud.Gcp), None ) diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala index 3ab4cffb2..44584653e 100644 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala @@ -22,7 +22,6 @@ import java.util.concurrent.{Executors, TimeUnit} import scala.concurrent.ExecutionContext import com.snowplowanalytics.snowplow.enrich.common.fs2.Run -import com.snowplowanalytics.snowplow.enrich.common.fs2.Telemetry import com.snowplowanalytics.snowplow.enrich.rabbitmq.generated.BuildInfo @@ -63,7 +62,7 @@ object Main extends IOApp.WithContext { Nil, _.data, MaxRecordSize, - Some(Telemetry.Cloud.Gcp), + None, None )