Skip to content

Commit

Permalink
Create the stub for readign from GCS and writing to pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
lukeindykiewicz committed Nov 5, 2020
1 parent 922ca9a commit 8c16d84
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 42 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ lazy val repeater = project
Dependencies.gcs,
Dependencies.pubsub,
Dependencies.pubsubFs2Grpc,
Dependencies.blobstoreCore,
Dependencies.blobstoreGcs,
Dependencies.slf4j,
Dependencies.catsEffect,
Dependencies.circeLiteral,
Expand Down
33 changes: 18 additions & 15 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object Dependencies {
val fs2 = "2.4.2"
val httpClient = "0.21.6"
val logging = "1.1.1"
val blobstore = "0.7.3"

/**
* After 1.102.0 the Google Cloud Java SDK versions diverge.
Expand Down Expand Up @@ -88,21 +89,23 @@ object Dependencies {
val googleOauth = "com.google.oauth-client" % "google-oauth-client" % V.googleOauth

// Scala third-party
val cats = "org.typelevel" %% "cats-core" % V.cats
val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect
val circe = "io.circe" %% "circe-core" % V.circe
val circeJawn = "io.circe" %% "circe-jawn" % V.circe
val circeLiteral = "io.circe" %% "circe-literal" % V.circe
val circeParser = "io.circe" %% "circe-parser" % V.circe
val decline = "com.monovore" %% "decline" % V.decline
val fs2 = "co.fs2" %% "fs2-core" % V.fs2
val httpClient = "org.http4s" %% "http4s-async-http-client" % V.httpClient
val logging = "io.chrisdavenport" %% "log4cats-slf4j" % V.logging
val pubsubFs2 = "com.permutive" %% "fs2-google-pubsub-http" % V.pubsubFs2
val pubsubFs2Grpc = "com.permutive" %% "fs2-google-pubsub-grpc" % V.pubsubFs2
val scioBigQuery = "com.spotify" %% "scio-bigquery" % V.scio
val scioCore = "com.spotify" %% "scio-core" % V.scio
val scioRepl = "com.spotify" %% "scio-repl" % V.scio
val cats = "org.typelevel" %% "cats-core" % V.cats
val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect
val circe = "io.circe" %% "circe-core" % V.circe
val circeJawn = "io.circe" %% "circe-jawn" % V.circe
val circeLiteral = "io.circe" %% "circe-literal" % V.circe
val circeParser = "io.circe" %% "circe-parser" % V.circe
val decline = "com.monovore" %% "decline" % V.decline
val fs2 = "co.fs2" %% "fs2-core" % V.fs2
val httpClient = "org.http4s" %% "http4s-async-http-client" % V.httpClient
val logging = "io.chrisdavenport" %% "log4cats-slf4j" % V.logging
val pubsubFs2 = "com.permutive" %% "fs2-google-pubsub-http" % V.pubsubFs2
val pubsubFs2Grpc = "com.permutive" %% "fs2-google-pubsub-grpc" % V.pubsubFs2
val scioBigQuery = "com.spotify" %% "scio-bigquery" % V.scio
val scioCore = "com.spotify" %% "scio-core" % V.scio
val scioRepl = "com.spotify" %% "scio-repl" % V.scio
val blobstoreCore = "com.github.fs2-blobstore" %% "core" % V.blobstore
val blobstoreGcs = "com.github.fs2-blobstore" %% "gcs" % V.blobstore

// Scala Snowplow
val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.snowplowanalytics.snowplow.storage.bigquery.repeater

import fs2.Stream
import cats.effect.Concurrent
import cats.effect.Timer
import blobstore.Path

object Recover {

def recoverFailedInserts[F[_]: Timer: Concurrent](resources: Resources[F]): Stream[F, Unit] =
for {
readFromGcs <- resources.store.get(Path("gs://foo/bar"), 1024)
_ <- Stream.eval(resources.pubSubProducer.produce(recover(readFromGcs)))
} yield ()

private def recover: Byte => EventContainer = ???

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ object Repeater extends SafeIOApp {
.through[IO, Unit](Flow.sink(resources))
desparatesSink = Flow.dequeueDesperates(resources)
logging = Stream.awakeEvery[IO](5.minute).evalMap(_ => resources.updateLifetime *> resources.showStats)
_ <- Stream(bqSink, desparatesSink, logging).parJoin(
StreamConcurrency
)
recover = Recover.recoverFailedInserts(resources)
_ <- Stream(bqSink, desparatesSink, logging, recover).parJoin(StreamConcurrency)
} yield ()

process.compile.drain.attempt.flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import fs2.concurrent.{Queue, SignallingRef}
import com.snowplowanalytics.snowplow.storage.bigquery.common.Config
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.RepeaterCli.GcsPath
import com.permutive.pubsub.producer.PubsubProducer
import blobstore.Store
import blobstore.gcs.GcsStore
import com.google.cloud.storage.StorageOptions

/**
* Resources container, allowing to manipulate all acquired entities
Expand All @@ -53,7 +57,9 @@ class Resources[F[_]: Sync](
val backoffTime: Int,
val concurrency: Int,
val insertBlocker: Blocker,
val jobStartTime: Instant
val jobStartTime: Instant,
val pubSubProducer: PubsubProducer[F, EventContainer],
val store: Store[F]
) {
def logInserted: F[Unit] =
statistics.update(s => s.copy(inserted = s.inserted + 1))
Expand Down Expand Up @@ -90,11 +96,11 @@ object Resources {
}

/** Allocate all resources for an application */
def acquire[F[_]: ConcurrentEffect: Timer: Logger](
def acquire[F[_]: ContextShift: ConcurrentEffect: Timer: Logger](
cmd: RepeaterCli.ListenCommand
): Resource[F, Resources[F]] = {
// It's a function because blocker needs to be created as Resource
val initResources: F[Blocker => Resources[F]] = for {
val initResources: F[Blocker => PubsubProducer[F, EventContainer] => Resources[F]] = for {
transformed <- Config.transform[F](cmd.config).value
env <- Sync[F].fromEither(transformed)
bigQuery <- services.Database.getClient[F]
Expand All @@ -103,31 +109,36 @@ object Resources {
stop <- SignallingRef[F, Boolean](false)
statistics <- Ref[F].of[Statistics](Statistics.start)
concurrency <- Sync[F].delay(Runtime.getRuntime.availableProcessors * 16)
storage = StorageOptions.getDefaultInstance.getService
_ <- Logger[F].info(
s"Initializing Repeater from ${env.config.failedInserts} to ${env.config.tableId} with $concurrency streams"
)
jobStartTime <- Sync[F].delay(Instant.now())
} yield (b: Blocker) =>
new Resources(
bigQuery,
cmd.deadEndBucket,
env,
queue,
counter,
stop,
statistics,
cmd.bufferSize,
cmd.window,
cmd.backoff,
concurrency,
b,
jobStartTime
)
(p: PubsubProducer[F, EventContainer]) =>
new Resources(
bigQuery,
cmd.deadEndBucket,
env,
queue,
counter,
stop,
statistics,
cmd.bufferSize,
cmd.window,
cmd.backoff,
concurrency,
b,
jobStartTime,
p,
GcsStore(storage, b, List.empty)
)

val createBlocker = Sync[F].delay(Executors.newCachedThreadPool()).map(ExecutionContext.fromExecutorService)
for {
blocker <- Resource.make(createBlocker)(ec => Sync[F].delay(ec.shutdown())).map(Blocker.liftExecutionContext)
resources <- Resource.make(initResources.map(init => init.apply(blocker)))(release)
blocker <- Resource.make(createBlocker)(ec => Sync[F].delay(ec.shutdown())).map(Blocker.liftExecutionContext)
pubsubProducer <- services.PubSub.getProducer[F]
resources <- Resource.make(initResources.map(init => init(blocker)(pubsubProducer)))(release)
} yield resources
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ import com.google.pubsub.v1.PubsubMessage
import cats.effect._
import cats.syntax.all._
import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig}
import com.permutive.pubsub.consumer.{ConsumerRecord, Model}
import com.permutive.pubsub.consumer.ConsumerRecord
import io.chrisdavenport.log4cats.Logger
import fs2.concurrent.Queue
import fs2.Stream

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload}
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater}
import com.permutive.pubsub.producer.grpc.GooglePubsubProducer
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.consumer
import com.permutive.pubsub.producer
import com.permutive.pubsub.producer.grpc.PubsubProducerConfig
import scala.concurrent.duration._

/** Module responsible for reading PubSub */
/** Module responsible for reading and writing PubSub */
object PubSub {

/** Read events from `failedInserts` topic */
Expand All @@ -37,8 +43,8 @@ object PubSub {
): Stream[F, ConsumerRecord[F, EventContainer]] =
PubsubGoogleConsumer.subscribe[F, EventContainer](
blocker,
Model.ProjectId(projectId),
Model.Subscription(subscription),
consumer.Model.ProjectId(projectId),
consumer.Model.Subscription(subscription),
(msg, err, ack, _) => callback[F](msg, err, ack, desperates),
PubsubGoogleConsumerConfig[F](onFailedTerminate = t => Logger[F].error(s"Terminating consumer due $t"))
)
Expand All @@ -49,4 +55,21 @@ object PubSub {
val badRow = BadRow.LoaderRecoveryError(Repeater.processor, failure, Payload.RawPayload(msg.toString))
desperates.enqueue1(badRow) >> ack
}

implicit val encoder: MessageEncoder[EventContainer] = new MessageEncoder[EventContainer] {
override def encode(a: EventContainer): Either[Throwable, Array[Byte]] = ???
}

//TODO: provide projectId and topic
def getProducer[F[_]: Concurrent: Timer: Logger] =
GooglePubsubProducer.of[F, EventContainer](
producer.Model.ProjectId("test-project"),
producer.Model.Topic("values"),
config = PubsubProducerConfig[F](
batchSize = 100,
delayThreshold = 100.millis,
onFailedTerminate = e => Sync[F].pure(println(s"Got error $e")) >> Sync[F].unit
)
)

}

0 comments on commit 8c16d84

Please sign in to comment.