From 7fda513ad6d4897bca60b3188f7bae29e7f49c69 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 12 Jul 2023 14:01:01 +0200 Subject: [PATCH] Add eventVolume and platform to observed_event (close #795) --- .../enrich/common/fs2/Environment.scala | 5 +- .../snowplow/enrich/common/fs2/Run.scala | 4 +- .../enrich/common/fs2/Telemetry.scala | 11 +-- .../enrich/common/fs2/config/io.scala | 10 +++ .../common/fs2/io/experimental/Metadata.scala | 77 +++++++++++-------- .../enrich/common/fs2/io/MetadataSpec.scala | 31 +++++--- ...{Aggregates.scala => AggregatesSpec.scala} | 9 +-- .../common/fs2/test/TestEnvironment.scala | 4 +- .../Main.scala | 3 +- .../snowplow/enrich/kinesis/KinesisRun.scala | 4 +- .../snowplow/enrich/pubsub/Main.scala | 4 +- .../snowplow/enrich/rabbitmq/Main.scala | 3 +- 12 files changed, 95 insertions(+), 70 deletions(-) rename modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/{Aggregates.scala => AggregatesSpec.scala} (82%) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index e26470c65..8c4e2ac5c 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -43,6 +43,7 @@ import com.snowplowanalytics.snowplow.enrich.common.utils.{BlockerF, HttpClient, import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, ParsedConfigs} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{ + Cloud, Concurrency, FeatureFlags, RemoteAdapterConfigs, @@ -124,7 +125,7 @@ final case class Environment[F[_], A]( processor: Processor, streamsSettings: Environment.StreamsSettings, region: Option[String], - cloud: Option[Telemetry.Cloud], + cloud: Option[Cloud], featureFlags: FeatureFlags ) @@ -179,7 +180,7 @@ object Environment { getPayload: A => Array[Byte], processor: Processor, maxRecordSize: Int, - cloud: Option[Telemetry.Cloud], + cloud: Option[Cloud], getRegion: => Option[String], featureFlags: FeatureFlags ): Resource[F, Environment[F, A]] = { diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index 7cefda86b..241c177bc 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -28,7 +28,7 @@ import retry.syntax.all._ import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Input, Monitoring, Output, RetryCheckpointing} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Cloud, Input, Monitoring, Output, RetryCheckpointing} import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs} import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Retries, Source} import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client @@ -54,7 +54,7 @@ object Run { mkClients: List[Blocker => Resource[F, Client[F]]], getPayload: A => Array[Byte], maxRecordSize: Int, - cloud: Option[Telemetry.Cloud], + cloud: Option[Cloud], getRegion: => Option[String] ): F[ExitCode] = CliConfig.command(name, version, description).parse(args) match { diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Telemetry.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Telemetry.scala index 2cee8eda6..1f2b49434 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Telemetry.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Telemetry.scala @@ -25,7 +25,6 @@ import fs2.Stream import org.http4s.client.{Client => HttpClient} import _root_.io.circe.Json -import _root_.io.circe.Encoder import _root_.io.circe.syntax._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} @@ -36,6 +35,7 @@ import com.snowplowanalytics.snowplow.scalatracker.Emitter.{Result => TrackerRes import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Telemetry => TelemetryConfig} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud object Telemetry { @@ -119,13 +119,4 @@ object Telemetry { "applicationVersion" -> appVersion.asJson ) ) - - sealed trait Cloud - - object Cloud { - case object Aws extends Cloud - case object Gcp extends Cloud - - implicit val encoder: Encoder[Cloud] = Encoder.encodeString.contramap[Cloud](_.toString().toUpperCase()) - } } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index ac578634f..53b89971b 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -583,4 +583,14 @@ object io { implicit val veroSchemasEncoder: Encoder[VeroSchemas] = deriveConfiguredEncoder[VeroSchemas] } + + sealed trait Cloud + + object Cloud { + case object Aws extends Cloud + case object Gcp extends Cloud + case object Azure extends Cloud + + implicit val encoder: Encoder[Cloud] = Encoder.encodeString.contramap[Cloud](_.toString().toUpperCase()) + } } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala index 796b12880..73093a947 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/experimental/Metadata.scala @@ -21,6 +21,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import cats.implicits._ import cats.Applicative import cats.data.NonEmptyList +import cats.kernel.Semigroup import cats.effect.{Async, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer} import cats.effect.concurrent.Ref import fs2.Stream @@ -49,7 +50,17 @@ trait Metadata[F[_]] { } object Metadata { - type EventsToEntities = Map[MetadataEvent, Set[SchemaKey]] + type Aggregates = Map[MetadataEvent, EntitiesAndCount] + case class EntitiesAndCount(entities: Set[SchemaKey], count: Int) + + implicit private def entitiesAndCountSemigroup: Semigroup[EntitiesAndCount] = + new Semigroup[EntitiesAndCount] { + override def combine(x: EntitiesAndCount, y: EntitiesAndCount): EntitiesAndCount = + EntitiesAndCount( + x.entities |+| y.entities, + x.count + y.count + ) + } private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] @@ -69,7 +80,7 @@ object Metadata { } yield () def observe(events: List[EnrichedEvent]): F[Unit] = - observedRef.eventsToEntities.update(recalculate(_, events)) + observedRef.aggregates.update(recalculate(_, events)) } } @@ -82,17 +93,18 @@ object Metadata { private def submit[F[_]: Sync: Clock](reporter: MetadataReporter[F], ref: MetadataEventsRef[F]): F[Unit] = for { snapshot <- MetadataEventsRef.snapshot(ref) - _ <- snapshot.eventsToEntities.keySet.toList - .traverse(reporter.report(snapshot.periodStart, snapshot.periodEnd)(_, snapshot.eventsToEntities)) + _ <- snapshot.aggregates.toList.traverse { + case (event, entitiesAndCount) => + reporter.report(snapshot.periodStart, snapshot.periodEnd, event, entitiesAndCount) + } } yield () trait MetadataReporter[F[_]] { def report( periodStart: Instant, - periodEnd: Instant - )( - snapshot: MetadataEvent, - eventsToEntities: EventsToEntities + periodEnd: Instant, + event: MetadataEvent, + entitiesAndCount: EntitiesAndCount ): F[Unit] } @@ -121,16 +133,15 @@ object Metadata { def report( periodStart: Instant, - periodEnd: Instant - )( + periodEnd: Instant, event: MetadataEvent, - eventsToEntities: EventsToEntities + entitiesAndCount: EntitiesAndCount ): F[Unit] = initTracker(config, appName, client).use { t => Logger[F].info(s"Tracking observed event ${event.schema.toSchemaUri}") >> t.trackSelfDescribingEvent( - mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event), - eventsToEntities.find(_._1 == event).map(_._2).toSeq.flatMap(mkWebhookContexts) + mkWebhookEvent(config.organizationId, config.pipelineId, periodStart, periodEnd, event, entitiesAndCount.count), + mkWebhookContexts(entitiesAndCount.entities).toSeq ) >> t.flushEmitters() } @@ -159,11 +170,13 @@ object Metadata { * @param schema - schema key of given event * @param source - `app_id` for given event * @param tracker - `v_tracker` for given event + * @param platform - The platform the app runs on for given event (`platform` field) */ case class MetadataEvent( schema: SchemaKey, source: Option[String], - tracker: Option[String] + tracker: Option[String], + platform: Option[String] ) object MetadataEvent { def apply(event: EnrichedEvent): MetadataEvent = @@ -175,30 +188,31 @@ object Metadata { Option(event.event_version).toRight("unknown-version").flatMap(SchemaVer.parseFull).getOrElse(SchemaVer.Full(0, 0, 0)) ), Option(event.app_id), - Option(event.v_tracker) + Option(event.v_tracker), + Option(event.platform) ) } /** * An representation of observed metadata events and entites attached to them over a period of time * - * @param eventsToEntities - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s - * @param periodStart - since when `eventsToEntities` are accumulated - * @param periodEnd - until when `eventsToEntities` are accumulated + * @param aggregates - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s + * @param periodStart - since when `aggregates` are accumulated + * @param periodEnd - until when `aggregates` are accumulated */ case class MetadataSnapshot( - eventsToEntities: EventsToEntities, + aggregates: Aggregates, periodStart: Instant, periodEnd: Instant ) /** * Internal state representation for current metadata period - * @param eventsToEntities - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s - * @param periodStart - since when `eventsToEntities` are accumulated + * @param aggregates - mappings of entities observed (since `periodStart`) for given `MetadataEvent`s + * @param periodStart - since when `aggregates` are accumulated */ case class MetadataEventsRef[F[_]]( - eventsToEntities: Ref[F, EventsToEntities], + aggregates: Ref[F, Aggregates], periodStart: Ref[F, Instant] ) @@ -206,15 +220,15 @@ object Metadata { def init[F[_]: Sync: Clock]: F[MetadataEventsRef[F]] = for { time <- Clock[F].instantNow - eventsToEntities <- Ref.of[F, EventsToEntities](Map.empty) + aggregates <- Ref.of[F, Aggregates](Map.empty) periodStart <- Ref.of[F, Instant](time) - } yield MetadataEventsRef(eventsToEntities, periodStart) + } yield MetadataEventsRef(aggregates, periodStart) def snapshot[F[_]: Sync: Clock](ref: MetadataEventsRef[F]): F[MetadataSnapshot] = for { periodEnd <- Clock[F].instantNow - eventsToEntities <- ref.eventsToEntities.getAndSet(Map.empty) + aggregates <- ref.aggregates.getAndSet(Map.empty) periodStart <- ref.periodStart.getAndSet(periodEnd) - } yield MetadataSnapshot(eventsToEntities, periodStart, periodEnd) + } yield MetadataSnapshot(aggregates, periodStart, periodEnd) } def unwrapEntities(event: EnrichedEvent): Set[SchemaKey] = { @@ -240,18 +254,19 @@ object Metadata { SchemaVer.parseFull(event.event_version).getOrElse(SchemaVer.Full(0, 0, 0)) ) - def recalculate(previous: EventsToEntities, events: List[EnrichedEvent]): EventsToEntities = - previous |+| events.map(e => Map(MetadataEvent(e) -> unwrapEntities(e))).combineAll + def recalculate(previous: Aggregates, events: List[EnrichedEvent]): Aggregates = + previous |+| events.map(e => Map(MetadataEvent(e) -> EntitiesAndCount(unwrapEntities(e), 1))).combineAll def mkWebhookEvent( organizationId: UUID, pipelineId: UUID, periodStart: Instant, periodEnd: Instant, - event: MetadataEvent + event: MetadataEvent, + count: Int ): SelfDescribingData[Json] = SelfDescribingData( - SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(4, 0, 0)), + SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(6, 0, 0)), Json.obj( "organizationId" -> organizationId.asJson, "pipelineId" -> pipelineId.asJson, @@ -260,6 +275,8 @@ object Metadata { "eventVersion" -> event.schema.version.asString.asJson, "source" -> event.source.getOrElse("unknown-source").asJson, "tracker" -> event.tracker.getOrElse("unknown-tracker").asJson, + "platform" -> event.platform.getOrElse("unknown-platform").asJson, + "eventVolume" -> Json.fromInt(count), "periodStart" -> periodStart.asJson, "periodEnd" -> periodEnd.asJson ) diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala index 450ff6000..c9a71f57c 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/MetadataSpec.scala @@ -27,19 +27,25 @@ import org.specs2.mutable.Specification import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Metadata => MetadataConfig} -import Metadata.{MetadataEvent, MetadataReporter} +import Metadata.{EntitiesAndCount, MetadataEvent, MetadataReporter} class MetadataSpec extends Specification with CatsIO { case class Report( periodStart: Instant, periodEnd: Instant, event: SchemaKey, - entities: Set[SchemaKey] + entitiesAndCount: EntitiesAndCount ) case class TestReporter[F[_]](state: Ref[F, List[Report]]) extends MetadataReporter[F] { - def report(periodStart: Instant, periodEnd: Instant)(event: MetadataEvent, mappings: Map[MetadataEvent, Set[SchemaKey]]): F[Unit] = + + def report( + periodStart: Instant, + periodEnd: Instant, + event: MetadataEvent, + entitiesAndCount: EntitiesAndCount + ): F[Unit] = state.update( - _ :+ Report(periodStart, periodEnd, event.schema, mappings.find(_._1 == event).map(_._2).toSet.flatten) + _ :+ Report(periodStart, periodEnd, event.schema, entitiesAndCount) ) } @@ -65,12 +71,13 @@ class MetadataSpec extends Specification with CatsIO { res.map(_.event) should containTheSameElementsAs( List(SchemaKey("unknown-vendor", "unknown-name", "unknown-format", SchemaVer.Full(0, 0, 0))) ) - res.flatMap(_.entities) should containTheSameElementsAs( + res.flatMap(_.entitiesAndCount.entities) should containTheSameElementsAs( Seq( SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 0)), SchemaKey("org.w3", "PerformanceTiming", "jsonschema", SchemaVer.Full(1, 0, 0)) ) ) + res.map(_.entitiesAndCount.count) should beEqualTo(List(1)) } } @@ -92,7 +99,7 @@ class MetadataSpec extends Specification with CatsIO { "add metadata event to empty state" in { val enriched = MetadataSpec.enriched Metadata.recalculate(Map.empty, List(enriched)) should containTheSameElementsAs( - Seq((MetadataEvent(enriched) -> Set.empty)) + Seq(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty, 1)) ) } @@ -101,9 +108,9 @@ class MetadataSpec extends Specification with CatsIO { val other = MetadataSpec.enriched val v1_0_1 = SchemaVer.Full(1, 0, 1) other.event_version = v1_0_1.asString - val previous = Map(MetadataEvent(enriched) -> Set.empty[SchemaKey]) + val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty[SchemaKey], 1)) Metadata.recalculate(previous, List(other)) should containTheSameElementsAs( - previous.toSeq ++ Seq(MetadataEvent(other) -> Set.empty[SchemaKey]) + previous.toSeq ++ Seq(MetadataEvent(other) -> EntitiesAndCount(Set.empty[SchemaKey], 1)) ) } @@ -119,9 +126,9 @@ class MetadataSpec extends Specification with CatsIO { enrichedBis.contexts = """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-1","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}""" val entityBis = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 1)) - val previous = Map(MetadataEvent(enriched) -> entities) + val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1)) Metadata.recalculate(previous, List(enrichedBis)) should containTheSameElementsAs( - Seq(MetadataEvent(enriched) -> (entities + entityBis)) + Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis, 2)) ) } @@ -141,9 +148,9 @@ class MetadataSpec extends Specification with CatsIO { enrichedTer.contexts = """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-2","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}""" val entityTer = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 2)) - val previous = Map(MetadataEvent(enriched) -> entities) + val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1)) Metadata.recalculate(previous, List(enrichedBis, enrichedTer)) should containTheSameElementsAs( - Seq(MetadataEvent(enriched) -> (entities + entityBis + entityTer)) + Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis + entityTer, 3)) ) } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Aggregates.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/AggregatesSpec.scala similarity index 82% rename from modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Aggregates.scala rename to modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/AggregatesSpec.scala index 6e6384a55..1ebe1521d 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Aggregates.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/AggregatesSpec.scala @@ -15,15 +15,14 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2.test import cats.effect.Sync import cats.effect.concurrent.Ref import fs2.Stream -import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata -import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata.MetadataEvent +import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata.{EntitiesAndCount, MetadataEvent} -object Aggregates { - def init[F[_]: Sync] = Ref.of[F, Map[MetadataEvent, Set[SchemaKey]]](Map.empty) +object AggregatesSpec { + def init[F[_]: Sync] = Ref.of[F, Map[MetadataEvent, EntitiesAndCount]](Map.empty) - def metadata[F[_]](ref: Ref[F, Map[MetadataEvent, Set[SchemaKey]]]): Metadata[F] = + def metadata[F[_]](ref: Ref[F, Map[MetadataEvent, EntitiesAndCount]]): Metadata[F] = new Metadata[F] { def report: Stream[F, Unit] = Stream.empty.covary[F] def observe(events: List[EnrichedEvent]): F[Unit] = diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala index 572a10ab4..131895ad1 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala @@ -133,8 +133,8 @@ object TestEnvironment extends CatsIO { _ <- filesResource(blocker, enrichments.flatMap(_.filesToCache).map(p => Paths.get(p._2))) counter <- Resource.eval(Counter.make[IO]) metrics = Counter.mkCounterMetrics[IO](counter)(Monad[IO], ioClock) - aggregates <- Resource.eval(Aggregates.init[IO]) - metadata = Aggregates.metadata[IO](aggregates) + aggregates <- Resource.eval(AggregatesSpec.init[IO]) + metadata = AggregatesSpec.metadata[IO](aggregates) clients = Clients.init[IO](http, Nil) sem <- Resource.eval(Semaphore[IO](1L)) assetsState <- Resource.eval(Assets.State.make(blocker, sem, clients, enrichments.flatMap(_.filesToCache))) diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala index 29e0a5de0..efadff774 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala @@ -19,6 +19,7 @@ import cats.{Applicative, Parallel} import cats.implicits._ import cats.effect.{ExitCode, IO, IOApp, Resource, SyncIO} import fs2.kafka.CommittableConsumerRecord +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud import com.snowplowanalytics.snowplow.enrich.common.fs2.Run import com.snowplowanalytics.snowplow.enrich.kafka.generated.BuildInfo @@ -60,7 +61,7 @@ object Main extends IOApp.WithContext { List.empty, _.record.value, MaxRecordSize, - None, + Some(Cloud.Azure), None ) diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala index 21a338883..6ee094b1c 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/KinesisRun.scala @@ -27,8 +27,8 @@ import fs2.aws.kinesis.CommittableRecord import software.amazon.kinesis.exceptions.ShutdownException +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud import com.snowplowanalytics.snowplow.enrich.common.fs2.Run -import com.snowplowanalytics.snowplow.enrich.common.fs2.Telemetry import com.snowplowanalytics.snowplow.enrich.kinesis.generated.BuildInfo @@ -56,7 +56,7 @@ object KinesisRun { List(_ => S3Client.mk[F]), getPayload, MaxRecordSize, - Some(Telemetry.Cloud.Aws), + Some(Cloud.Aws), getRuntimeRegion ) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala index 3795525e2..09adbca4c 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Main.scala @@ -23,8 +23,8 @@ import scala.concurrent.ExecutionContext import com.permutive.pubsub.consumer.ConsumerRecord +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Cloud import com.snowplowanalytics.snowplow.enrich.common.fs2.Run -import com.snowplowanalytics.snowplow.enrich.common.fs2.Telemetry import com.snowplowanalytics.snowplow.enrich.pubsub.generated.BuildInfo @@ -71,7 +71,7 @@ object Main extends IOApp.WithContext { List(b => Resource.eval(GcsClient.mk[IO](b))), _.value, MaxRecordSize, - Some(Telemetry.Cloud.Gcp), + Some(Cloud.Gcp), None ) diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala index 3ab4cffb2..44584653e 100644 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala @@ -22,7 +22,6 @@ import java.util.concurrent.{Executors, TimeUnit} import scala.concurrent.ExecutionContext import com.snowplowanalytics.snowplow.enrich.common.fs2.Run -import com.snowplowanalytics.snowplow.enrich.common.fs2.Telemetry import com.snowplowanalytics.snowplow.enrich.rabbitmq.generated.BuildInfo @@ -63,7 +62,7 @@ object Main extends IOApp.WithContext { Nil, _.data, MaxRecordSize, - Some(Telemetry.Cloud.Gcp), + None, None )