From 8c16d84d396ffbabe52a2d3082bd56281f678a96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Indykiewicz?= Date: Thu, 5 Nov 2020 01:11:49 +0100 Subject: [PATCH] Create the stub for readign from GCS and writing to pub/sub --- build.sbt | 2 + project/Dependencies.scala | 33 ++++++------ .../Recover.scala | 18 +++++++ .../Repeater.scala | 5 +- .../Resources.scala | 51 +++++++++++-------- .../services/PubSub.scala | 31 +++++++++-- 6 files changed, 98 insertions(+), 42 deletions(-) create mode 100644 repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Recover.scala diff --git a/build.sbt b/build.sbt index 6bd54f77..b157ea77 100644 --- a/build.sbt +++ b/build.sbt @@ -100,6 +100,8 @@ lazy val repeater = project Dependencies.gcs, Dependencies.pubsub, Dependencies.pubsubFs2Grpc, + Dependencies.blobstoreCore, + Dependencies.blobstoreGcs, Dependencies.slf4j, Dependencies.catsEffect, Dependencies.circeLiteral, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 152a249a..24b56a83 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -43,6 +43,7 @@ object Dependencies { val fs2 = "2.4.2" val httpClient = "0.21.6" val logging = "1.1.1" + val blobstore = "0.7.3" /** * After 1.102.0 the Google Cloud Java SDK versions diverge. @@ -88,21 +89,23 @@ object Dependencies { val googleOauth = "com.google.oauth-client" % "google-oauth-client" % V.googleOauth // Scala third-party - val cats = "org.typelevel" %% "cats-core" % V.cats - val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect - val circe = "io.circe" %% "circe-core" % V.circe - val circeJawn = "io.circe" %% "circe-jawn" % V.circe - val circeLiteral = "io.circe" %% "circe-literal" % V.circe - val circeParser = "io.circe" %% "circe-parser" % V.circe - val decline = "com.monovore" %% "decline" % V.decline - val fs2 = "co.fs2" %% "fs2-core" % V.fs2 - val httpClient = "org.http4s" %% "http4s-async-http-client" % V.httpClient - val logging = "io.chrisdavenport" %% "log4cats-slf4j" % V.logging - val pubsubFs2 = "com.permutive" %% "fs2-google-pubsub-http" % V.pubsubFs2 - val pubsubFs2Grpc = "com.permutive" %% "fs2-google-pubsub-grpc" % V.pubsubFs2 - val scioBigQuery = "com.spotify" %% "scio-bigquery" % V.scio - val scioCore = "com.spotify" %% "scio-core" % V.scio - val scioRepl = "com.spotify" %% "scio-repl" % V.scio + val cats = "org.typelevel" %% "cats-core" % V.cats + val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect + val circe = "io.circe" %% "circe-core" % V.circe + val circeJawn = "io.circe" %% "circe-jawn" % V.circe + val circeLiteral = "io.circe" %% "circe-literal" % V.circe + val circeParser = "io.circe" %% "circe-parser" % V.circe + val decline = "com.monovore" %% "decline" % V.decline + val fs2 = "co.fs2" %% "fs2-core" % V.fs2 + val httpClient = "org.http4s" %% "http4s-async-http-client" % V.httpClient + val logging = "io.chrisdavenport" %% "log4cats-slf4j" % V.logging + val pubsubFs2 = "com.permutive" %% "fs2-google-pubsub-http" % V.pubsubFs2 + val pubsubFs2Grpc = "com.permutive" %% "fs2-google-pubsub-grpc" % V.pubsubFs2 + val scioBigQuery = "com.spotify" %% "scio-bigquery" % V.scio + val scioCore = "com.spotify" %% "scio-core" % V.scio + val scioRepl = "com.spotify" %% "scio-repl" % V.scio + val blobstoreCore = "com.github.fs2-blobstore" %% "core" % V.blobstore + val blobstoreGcs = "com.github.fs2-blobstore" %% "gcs" % V.blobstore // Scala Snowplow val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Recover.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Recover.scala new file mode 100644 index 00000000..2dcf8248 --- /dev/null +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Recover.scala @@ -0,0 +1,18 @@ +package com.snowplowanalytics.snowplow.storage.bigquery.repeater + +import fs2.Stream +import cats.effect.Concurrent +import cats.effect.Timer +import blobstore.Path + +object Recover { + + def recoverFailedInserts[F[_]: Timer: Concurrent](resources: Resources[F]): Stream[F, Unit] = + for { + readFromGcs <- resources.store.get(Path("gs://foo/bar"), 1024) + _ <- Stream.eval(resources.pubSubProducer.produce(recover(readFromGcs))) + } yield () + + private def recover: Byte => EventContainer = ??? + +} diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala index 9924b7cb..70445eee 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Repeater.scala @@ -51,9 +51,8 @@ object Repeater extends SafeIOApp { .through[IO, Unit](Flow.sink(resources)) desparatesSink = Flow.dequeueDesperates(resources) logging = Stream.awakeEvery[IO](5.minute).evalMap(_ => resources.updateLifetime *> resources.showStats) - _ <- Stream(bqSink, desparatesSink, logging).parJoin( - StreamConcurrency - ) + recover = Recover.recoverFailedInserts(resources) + _ <- Stream(bqSink, desparatesSink, logging, recover).parJoin(StreamConcurrency) } yield () process.compile.drain.attempt.flatMap { diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala index bfda015b..dad4966d 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/Resources.scala @@ -29,6 +29,10 @@ import fs2.concurrent.{Queue, SignallingRef} import com.snowplowanalytics.snowplow.storage.bigquery.common.Config import com.snowplowanalytics.snowplow.badrows.BadRow import com.snowplowanalytics.snowplow.storage.bigquery.repeater.RepeaterCli.GcsPath +import com.permutive.pubsub.producer.PubsubProducer +import blobstore.Store +import blobstore.gcs.GcsStore +import com.google.cloud.storage.StorageOptions /** * Resources container, allowing to manipulate all acquired entities @@ -53,7 +57,9 @@ class Resources[F[_]: Sync]( val backoffTime: Int, val concurrency: Int, val insertBlocker: Blocker, - val jobStartTime: Instant + val jobStartTime: Instant, + val pubSubProducer: PubsubProducer[F, EventContainer], + val store: Store[F] ) { def logInserted: F[Unit] = statistics.update(s => s.copy(inserted = s.inserted + 1)) @@ -90,11 +96,11 @@ object Resources { } /** Allocate all resources for an application */ - def acquire[F[_]: ConcurrentEffect: Timer: Logger]( + def acquire[F[_]: ContextShift: ConcurrentEffect: Timer: Logger]( cmd: RepeaterCli.ListenCommand ): Resource[F, Resources[F]] = { // It's a function because blocker needs to be created as Resource - val initResources: F[Blocker => Resources[F]] = for { + val initResources: F[Blocker => PubsubProducer[F, EventContainer] => Resources[F]] = for { transformed <- Config.transform[F](cmd.config).value env <- Sync[F].fromEither(transformed) bigQuery <- services.Database.getClient[F] @@ -103,31 +109,36 @@ object Resources { stop <- SignallingRef[F, Boolean](false) statistics <- Ref[F].of[Statistics](Statistics.start) concurrency <- Sync[F].delay(Runtime.getRuntime.availableProcessors * 16) + storage = StorageOptions.getDefaultInstance.getService _ <- Logger[F].info( s"Initializing Repeater from ${env.config.failedInserts} to ${env.config.tableId} with $concurrency streams" ) jobStartTime <- Sync[F].delay(Instant.now()) } yield (b: Blocker) => - new Resources( - bigQuery, - cmd.deadEndBucket, - env, - queue, - counter, - stop, - statistics, - cmd.bufferSize, - cmd.window, - cmd.backoff, - concurrency, - b, - jobStartTime - ) + (p: PubsubProducer[F, EventContainer]) => + new Resources( + bigQuery, + cmd.deadEndBucket, + env, + queue, + counter, + stop, + statistics, + cmd.bufferSize, + cmd.window, + cmd.backoff, + concurrency, + b, + jobStartTime, + p, + GcsStore(storage, b, List.empty) + ) val createBlocker = Sync[F].delay(Executors.newCachedThreadPool()).map(ExecutionContext.fromExecutorService) for { - blocker <- Resource.make(createBlocker)(ec => Sync[F].delay(ec.shutdown())).map(Blocker.liftExecutionContext) - resources <- Resource.make(initResources.map(init => init.apply(blocker)))(release) + blocker <- Resource.make(createBlocker)(ec => Sync[F].delay(ec.shutdown())).map(Blocker.liftExecutionContext) + pubsubProducer <- services.PubSub.getProducer[F] + resources <- Resource.make(initResources.map(init => init(blocker)(pubsubProducer)))(release) } yield resources } diff --git a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala index 9ea00ba5..3c4600d5 100644 --- a/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala +++ b/repeater/src/main/scala/com.snowplowanalytics.snowplow.storage.bigquery.repeater/services/PubSub.scala @@ -17,15 +17,21 @@ import com.google.pubsub.v1.PubsubMessage import cats.effect._ import cats.syntax.all._ import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} -import com.permutive.pubsub.consumer.{ConsumerRecord, Model} +import com.permutive.pubsub.consumer.ConsumerRecord import io.chrisdavenport.log4cats.Logger import fs2.concurrent.Queue import fs2.Stream import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload} import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater} +import com.permutive.pubsub.producer.grpc.GooglePubsubProducer +import com.permutive.pubsub.producer.encoder.MessageEncoder +import com.permutive.pubsub.consumer +import com.permutive.pubsub.producer +import com.permutive.pubsub.producer.grpc.PubsubProducerConfig +import scala.concurrent.duration._ -/** Module responsible for reading PubSub */ +/** Module responsible for reading and writing PubSub */ object PubSub { /** Read events from `failedInserts` topic */ @@ -37,8 +43,8 @@ object PubSub { ): Stream[F, ConsumerRecord[F, EventContainer]] = PubsubGoogleConsumer.subscribe[F, EventContainer]( blocker, - Model.ProjectId(projectId), - Model.Subscription(subscription), + consumer.Model.ProjectId(projectId), + consumer.Model.Subscription(subscription), (msg, err, ack, _) => callback[F](msg, err, ack, desperates), PubsubGoogleConsumerConfig[F](onFailedTerminate = t => Logger[F].error(s"Terminating consumer due $t")) ) @@ -49,4 +55,21 @@ object PubSub { val badRow = BadRow.LoaderRecoveryError(Repeater.processor, failure, Payload.RawPayload(msg.toString)) desperates.enqueue1(badRow) >> ack } + + implicit val encoder: MessageEncoder[EventContainer] = new MessageEncoder[EventContainer] { + override def encode(a: EventContainer): Either[Throwable, Array[Byte]] = ??? + } + + //TODO: provide projectId and topic + def getProducer[F[_]: Concurrent: Timer: Logger] = + GooglePubsubProducer.of[F, EventContainer]( + producer.Model.ProjectId("test-project"), + producer.Model.Topic("values"), + config = PubsubProducerConfig[F]( + batchSize = 100, + delayThreshold = 100.millis, + onFailedTerminate = e => Sync[F].pure(println(s"Got error $e")) >> Sync[F].unit + ) + ) + }