From 5d810230db52fb8675b8499e5c1ea2e2f9171742 Mon Sep 17 00:00:00 2001 From: Sami Dalouche Date: Wed, 30 Aug 2023 08:58:17 -0600 Subject: [PATCH 1/3] chore: use scalafix to update to CE3 ecosystem --- .../connectors/facebook/Resources.scala | 16 ++++------ .../connectors/facebook/routes/Logging.scala | 2 +- .../routes/profiles/ProfileRoutes.scala | 2 +- .../facebook/routes/tokens/TokenRoutes.scala | 2 +- backend/build.sbt | 1 + backend/project/plugins.sbt | 1 + .../facebook/services/ApiClient.scala | 2 +- .../facebook/services/AppApiClient.scala | 2 +- .../facebook/services/FacebookClient.scala | 5 ++-- .../services/TokenEncryptionService.scala | 2 +- .../facebook/DeliveryProcessor.scala | 8 ++--- .../connectors/facebook/Resources.scala | 30 +++++++------------ 12 files changed, 30 insertions(+), 43 deletions(-) diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/Resources.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/Resources.scala index 416607c..f5d5489 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/Resources.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/Resources.scala @@ -1,6 +1,6 @@ package io.narrative.connectors.facebook -import cats.effect.{Blocker, ContextShift, IO, Resource} +import cats.effect.{IO, Resource} import com.amazonaws.auth.{AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} import com.amazonaws.regions.Regions import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder} @@ -25,13 +25,13 @@ final case class Resources( ssm: AWSSimpleSystemsManagement, xa: Transactor[IO] ) { - def resolve(value: Config.Value)(implicit contextShift: ContextShift[IO]): IO[String] = + def resolve(value: Config.Value): IO[String] = Resources.resolve(blocker, ssm, value) } object Resources extends LazyLogging { - def apply(config: Config)(implicit contextShift: ContextShift[IO]): Resource[IO, Resources] = + def apply(config: Config): Resource[IO, Resources] = for { - blocker <- Blocker[IO] + blocker <- Resource.unit[IO] awsCredentials = new DefaultAWSCredentialsProviderChain() client <- BlazeClientBuilder[IO](blocker.blockingContext).resource serverEC <- ExecutionContexts.fixedThreadPool[IO](64) @@ -53,9 +53,7 @@ object Resources extends LazyLogging { IO(AWSKMSClientBuilder.standard().withRegion(Regions.US_EAST_1).withCredentials(awsCredentials).build()) )(kms => IO(kms.shutdown())) - def transactor(blocker: Blocker, ssm: AWSSimpleSystemsManagement, db: Config.Database)(implicit - contextShift: ContextShift[IO] - ): Resource[IO, Transactor[IO]] = for { + def transactor(ssm: AWSSimpleSystemsManagement, db: Config.Database): Resource[IO, Transactor[IO]] = for { username <- Resource.eval(resolve(blocker, ssm, db.username)) password <- Resource.eval(resolve(blocker, ssm, db.password)) jdbcUrl <- Resource.eval(resolve(blocker, ssm, db.jdbcUrl)) @@ -70,9 +68,7 @@ object Resources extends LazyLogging { ) } yield xa - def resolve(blocker: Blocker, ssm: AWSSimpleSystemsManagement, value: Config.Value)(implicit - contextShift: ContextShift[IO] - ): IO[String] = + def resolve(ssm: AWSSimpleSystemsManagement, value: Config.Value): IO[String] = value match { case Config.Literal(value) => IO.pure(value) diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Logging.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Logging.scala index fe9a123..11cdf3c 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Logging.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Logging.scala @@ -7,7 +7,7 @@ import org.http4s.{HttpApp, Response} import org.http4s.server.middleware._ object Logging extends LazyLogging { - def apply(routes: HttpApp[IO])(implicit cs: ContextShift[IO]): HttpApp[IO] = { + def apply(routes: HttpApp[IO]): HttpApp[IO] = { // The order that these middleware are stacked matters. Any other combination and the X-Request-ID header won't // propagate to the response when the server throws an exception. errorResponseLogging { diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/profiles/ProfileRoutes.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/profiles/ProfileRoutes.scala index 45e6eaa..ae89bcc 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/profiles/ProfileRoutes.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/profiles/ProfileRoutes.scala @@ -1,7 +1,7 @@ package io.narrative.connectors.facebook.routes.profiles import cats.data.{EitherT, OptionT} -import cats.effect.{ContextShift, IO} +import cats.effect.IO import cats.syntax.show._ import com.typesafe.scalalogging.LazyLogging import io.narrative.connectors.facebook.domain.Profile diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/tokens/TokenRoutes.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/tokens/TokenRoutes.scala index fb696e4..90ee84d 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/tokens/TokenRoutes.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/tokens/TokenRoutes.scala @@ -1,6 +1,6 @@ package io.narrative.connectors.facebook.routes.tokens -import cats.effect.{ContextShift, IO} +import cats.effect.IO import com.typesafe.scalalogging.LazyLogging import io.narrative.connectors.facebook.routes.Auth import io.narrative.connectors.facebook.services.{ProfileService, TokenMetaRequest} diff --git a/backend/build.sbt b/backend/build.sbt index aad6bd4..a882cd4 100644 --- a/backend/build.sbt +++ b/backend/build.sbt @@ -10,6 +10,7 @@ ThisBuild / publishTo := { } ThisBuild / exportPipelining := false ThisBuild / usePipelining := true +//ThisBuild / scalacOptions += "-P:semanticdb:synthetics:on" name := "narrative-facebook-connector" diff --git a/backend/project/plugins.sbt b/backend/project/plugins.sbt index 19b5fbc..dc5fdcd 100644 --- a/backend/project/plugins.sbt +++ b/backend/project/plugins.sbt @@ -9,3 +9,4 @@ addSbtPlugin("io.narrative" % "common-build" % "4.0.1") addDependencyTreePlugin addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.0") diff --git a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/ApiClient.scala b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/ApiClient.scala index 9501f1d..9ca30b3 100644 --- a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/ApiClient.scala +++ b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/ApiClient.scala @@ -1,7 +1,7 @@ package io.narrative.connectors.facebook.services import cats.{Eq, Show} -import cats.effect.{ContextShift, IO} +import cats.effect.IO import cats.syntax.applicativeError._ import cats.syntax.show._ import io.circe.{Decoder, Encoder} diff --git a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/AppApiClient.scala b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/AppApiClient.scala index 5abf5f4..58f282a 100644 --- a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/AppApiClient.scala +++ b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/AppApiClient.scala @@ -2,7 +2,6 @@ package io.narrative.connectors.facebook.services import cats.{Eq, Show} import cats.effect.IO -import cats.effect.concurrent.Ref import cats.syntax.applicativeError._ import cats.syntax.option._ import com.typesafe.scalalogging.LazyLogging @@ -14,6 +13,7 @@ import org.http4s.{AuthScheme, BasicCredentials, Credentials, Method, Request, S import org.http4s.client.{Client, UnexpectedStatus} import org.http4s.circe.CirceEntityCodec._ import org.http4s.headers.Authorization +import cats.effect.Ref /** todo(mbabic) scaladocs */ class AppApiClient private ( diff --git a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/FacebookClient.scala b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/FacebookClient.scala index 92635ec..503aaf4 100644 --- a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/FacebookClient.scala +++ b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/FacebookClient.scala @@ -1,7 +1,7 @@ package io.narrative.connectors.facebook.services import cats.data.NonEmptyList -import cats.effect.{Blocker, ContextShift, IO, Timer} +import cats.effect.IO import cats.instances.list._ import cats.syntax.show._ import cats.syntax.traverse._ @@ -17,11 +17,12 @@ import java.time.Instant import scala.jdk.CollectionConverters._ import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import cats.effect.Temporal /** A Facebook API wrapper. */ class FacebookClient(app: FacebookApp, blocker: Blocker)(implicit cs: ContextShift[IO], - timer: Timer[IO] + timer: Temporal[IO] ) extends FacebookClient.Ops[IO] with LazyLogging { import FacebookClient._ diff --git a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/TokenEncryptionService.scala b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/TokenEncryptionService.scala index b2c1c4f..3abdcf3 100644 --- a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/TokenEncryptionService.scala +++ b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/TokenEncryptionService.scala @@ -1,6 +1,6 @@ package io.narrative.connectors.facebook.services -import cats.effect.{Blocker, ContextShift, IO} +import cats.effect.IO import com.amazonaws.services.kms.AWSKMS import com.amazonaws.services.kms.model.{DecryptRequest, EncryptRequest} import io.narrative.connectors.facebook.domain.Token diff --git a/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala b/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala index 776d150..cb92f34 100644 --- a/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala +++ b/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala @@ -1,7 +1,7 @@ package io.narrative.connectors.facebook import cats.data.OptionT -import cats.effect.{Blocker, ContextShift, IO} +import cats.effect.IO import cats.syntax.show._ import com.typesafe.scalalogging.LazyLogging import io.circe.parser.parse @@ -35,7 +35,7 @@ class DeliveryProcessor( ) extends DeliveryProcessor.Ops[IO] with LazyLogging { - override def processIfDeliverable(job: Job)(implicit cs: ContextShift[IO]): IO[Unit] = { + override def processIfDeliverable(job: Job): IO[Unit] = { val deliverIO = for { command <- OptionT(commandStore.command(job.eventRevision)) .getOrRaise(new RuntimeException(s"could not find command with revision ${job.eventRevision.show}")) @@ -50,7 +50,7 @@ class DeliveryProcessor( deliverIO.handleErrorWith(markFailure(job, job.file, _)) } - private def processDeliverable(job: Job, deliverable: DeliverableEntry)(implicit cs: ContextShift[IO]): IO[Unit] = + private def processDeliverable(job: Job, deliverable: DeliverableEntry): IO[Unit] = for { settings <- OptionT(settingsStore.settings(deliverable.settingsId)) .getOrRaise(new RuntimeException(s"could not find settings with id ${deliverable.settingsId.show}")) @@ -70,8 +70,6 @@ class DeliveryProcessor( event: DeliverableEntry, settings: Settings, token: FacebookToken - )(implicit - cs: ContextShift[IO] ): IO[Unit] = { val (fileResource, parseAudience) = event match { case _: SubscriptionDeliveryEntry => diff --git a/backend/worker/src/main/scala/io/narrative/connectors/facebook/Resources.scala b/backend/worker/src/main/scala/io/narrative/connectors/facebook/Resources.scala index 665b289..79f83fa 100644 --- a/backend/worker/src/main/scala/io/narrative/connectors/facebook/Resources.scala +++ b/backend/worker/src/main/scala/io/narrative/connectors/facebook/Resources.scala @@ -1,6 +1,6 @@ package io.narrative.connectors.facebook -import cats.effect.{Blocker, ContextShift, IO, Resource, Timer} +import cats.effect.{IO, Resource} import com.amazonaws.auth.{AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} import com.amazonaws.regions.Regions import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder} @@ -25,6 +25,7 @@ import io.narrative.connectors.queue.QueueStore import io.narrative.connectors.spark.{ParquetTransformer, SparkSessions} import io.narrative.microframework.config.Stage import org.http4s.blaze.client.BlazeClientBuilder +import cats.effect.Temporal final case class Resources( eventConsumer: EventConsumer.Ops[IO], @@ -36,11 +37,10 @@ final case class Resources( ) object Resources extends LazyLogging { def apply(config: Config, parallelizationFactor: Int)(implicit - contextShift: ContextShift[IO], - timer: Timer[IO] + timer: Temporal[IO] ): Resource[IO, Resources] = for { - blocker <- Blocker[IO] + blocker <- Resource.unit[IO] awsCredentials = new DefaultAWSCredentialsProviderChain() ssm <- SSMResources.ssmClient(logger.underlying, blocker, awsCredentials) xa <- transactor(blocker, ssm, config.database, parallelizationFactor) @@ -88,9 +88,7 @@ object Resources extends LazyLogging { transactor = xa ) - def baseAppApiClient(blocker: Blocker, config: Config, ssm: AWSSimpleSystemsManagement)(implicit - cs: ContextShift[IO] - ): Resource[IO, BaseAppApiClient.Ops[IO]] = + def baseAppApiClient(config: Config, ssm: AWSSimpleSystemsManagement): Resource[IO, BaseAppApiClient.Ops[IO]] = for { blazeClient <- BlazeClientBuilder[IO](scala.concurrent.ExecutionContext.Implicits.global).resource id <- Resource.eval(resolve(blocker, ssm, config.narrativeApi.clientId)).map(ClientId.apply) @@ -110,29 +108,23 @@ object Resources extends LazyLogging { private def encryptionService( awsCredentials: AWSCredentialsProvider, - blocker: Blocker, config: Config, ssm: AWSSimpleSystemsManagement - )(implicit contextShift: ContextShift[IO]): Resource[IO, TokenEncryptionService.Ops[IO]] = for { + ): Resource[IO, TokenEncryptionService.Ops[IO]] = for { kms <- awsKms(awsCredentials) keyId <- Resource.eval(resolve(blocker, ssm, config.kms.tokenEncryptionKeyId)).map(KmsKeyId.apply) } yield new TokenEncryptionService(blocker, keyId, kms) - private def facebookClient(blocker: Blocker, config: Config, ssm: AWSSimpleSystemsManagement)(implicit - contextShift: ContextShift[IO], - timer: Timer[IO] + private def facebookClient(config: Config, ssm: AWSSimpleSystemsManagement)(implicit + timer: Temporal[IO] ): IO[FacebookClient.Ops[IO]] = for { id <- resolve(blocker, ssm, config.facebook.appId).map(FacebookApp.Id.apply) secret <- resolve(blocker, ssm, config.facebook.appSecret).map(FacebookApp.Secret.apply) } yield new FacebookClient(FacebookApp(id, secret), blocker) - private def transactor( - blocker: Blocker, - ssm: AWSSimpleSystemsManagement, + private def transactor(ssm: AWSSimpleSystemsManagement, db: Config.Database, parallelizationFactor: Int - )(implicit - contextShift: ContextShift[IO] ): Resource[IO, Transactor[IO]] = for { username <- Resource.eval(resolve(blocker, ssm, db.username)) password <- Resource.eval(resolve(blocker, ssm, db.password)) @@ -151,9 +143,7 @@ object Resources extends LazyLogging { ) } yield xa - private def resolve(blocker: Blocker, ssm: AWSSimpleSystemsManagement, value: Config.Value)(implicit - contextShift: ContextShift[IO] - ): IO[String] = + private def resolve(ssm: AWSSimpleSystemsManagement, value: Config.Value): IO[String] = value match { case Config.Literal(value) => IO.pure(value) From 746b6cf67601ee77d1ee6bbf424c78bbb3553734 Mon Sep 17 00:00:00 2001 From: Sami Dalouche Date: Wed, 30 Aug 2023 09:07:24 -0600 Subject: [PATCH 2/3] chore: update dependencies to match CE3 ecosystem --- backend/build.sbt | 4 ++-- backend/project/LibraryDependencies.scala | 27 ++++++++++++----------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/backend/build.sbt b/backend/build.sbt index a882cd4..911d4f1 100644 --- a/backend/build.sbt +++ b/backend/build.sbt @@ -76,7 +76,7 @@ lazy val `api` = project Http4s.`http4s-dsl`, Http4s.`http4s-circe`, Http4s.`http4s-server`, - Http4s.`http4s-blaze-server`, + Http4s.`http4s-ember-server`, NarrativeBackend.`narrative-common-ssm`, NarrativeBackend.`narrative-microframework-config`, ScalaTest.`scalatest` % "test" @@ -128,7 +128,7 @@ lazy val `services` = project libraryDependencies ++= Seq( Aws.`aws-java-sdk-kms`, Facebook.`facebook-java-business-sdk`, - Http4s.`http4s-blaze-client`, + Http4s.`http4s-ember-client`, Http4s.`http4s-circe`, NarrativeBackend.`narrative-common-catsretry`, NarrativeBackend.`narrative-microframework-config`, diff --git a/backend/project/LibraryDependencies.scala b/backend/project/LibraryDependencies.scala index 7caaefa..f48c4c4 100644 --- a/backend/project/LibraryDependencies.scala +++ b/backend/project/LibraryDependencies.scala @@ -2,7 +2,7 @@ import sbt._ object LibraryDependencies { object NarrativeBackend { - val version = "20.0.1" + val version = "20.0.21" val `narrative-common-ssm` = "io.narrative" %% "common-ssm" % version val `narrative-common-catsretry` = "io.narrative" %% "common-catsretry" % version val `narrative-common-doobie-testkit` = "io.narrative" %% "common-doobie-testkit" % version @@ -15,7 +15,7 @@ object LibraryDependencies { } object NarrativeConnectorFramework { - val version = "0.1.0" + val version = "0.1.1" val `connector-framework-core` = "io.narrative" %% "connector-framework-core" % version @@ -29,31 +29,32 @@ object LibraryDependencies { } object Cats { - val `cats-core` = "org.typelevel" %% "cats-core" % "2.8.0" - val `cats-effect` = "org.typelevel" %% "cats-effect" % "2.5.4" + val `cats-core` = "org.typelevel" %% "cats-core" % "2.9.0" + val `cats-effect` = "org.typelevel" %% "cats-effect" % "3.5.1" } object CatsRetry { - val version = "2.1.1" + val version = "3.1.0" val `cats-retry` = "com.github.cb372" %% "cats-retry" % version } object Circe { - val version = "0.14.2" + val version = "0.14.5" val `circe-core` = "io.circe" %% "circe-core" % version val `circe-generic` = "io.circe" %% "circe-generic" % version - val `circe-generic-extras` = "io.circe" %% "circe-generic-extras" % version + val `circe-generic-extras` = "io.circe" %% "circe-generic-extras" % "0.14.3" val `circe-literal` = "io.circe" %% "circe-literal" % version val `circe-parser` = "io.circe" %% "circe-parser" % version } object CirceFs2 { - val version = "0.13.0" + // TODO: migrate to https://fs2-data.gnieh.org/documentation/json/libraries/#circe + val version = "0.14.1" val `circe-fs2` = "io.circe" %% "circe-fs2" % version } object Doobie { - val version = "0.9.0" + val version = "1.0.0-RC4" val `doobie-core` = "org.tpolecat" %% "doobie-core" % version val `doobie-postgres` = "org.tpolecat" %% "doobie-postgres" % version val `doobie-hikari` = "org.tpolecat" %% "doobie-hikari" % version @@ -65,19 +66,19 @@ object LibraryDependencies { } object Fs2 { - val version = "2.4.4" + val version = "3.7.0" val `fs2-core` = "co.fs2" %% "fs2-core" % version val `fs2-io` = "co.fs2" %% "fs2-io" % version } object Http4s { - val version = "0.22.7" + val version = "0.23.22" val `http4s-core` = "org.http4s" %% "http4s-dsl" % version val `http4s-dsl` = "org.http4s" %% "http4s-dsl" % version val `http4s-circe` = "org.http4s" %% "http4s-circe" % version val `http4s-server` = "org.http4s" %% "http4s-server" % version - val `http4s-blaze-server` = "org.http4s" %% "http4s-blaze-server" % version - val `http4s-blaze-client` = "org.http4s" %% "http4s-blaze-client" % version + val `http4s-ember-server` = "org.http4s" %% "http4s-ember-server" % version + val `http4s-ember-client` = "org.http4s" %% "http4s-ember-client" % version } object Jackson { From 683a6e22bbb66047b941699e6993abd8e5da9808 Mon Sep 17 00:00:00 2001 From: Sami Dalouche Date: Wed, 30 Aug 2023 09:30:29 -0600 Subject: [PATCH 3/3] chore: manual pass to make project compile with CE3 ecosystem --- .../connectors/facebook/Resources.scala | 30 +++----- .../connectors/facebook/Server.scala | 23 ++++-- .../connectors/facebook/routes/Auth.scala | 1 - .../connectors/facebook/routes/Logging.scala | 4 +- .../routes/profiles/ProfileRoutes.scala | 4 +- .../facebook/routes/tokens/TokenRoutes.scala | 2 +- .../facebook/services/ApiClient.scala | 5 +- .../facebook/services/AppApiClient.scala | 3 +- .../facebook/services/FacebookClient.scala | 35 ++++----- .../services/TokenEncryptionService.scala | 5 +- .../facebook/stores/CommandStore.scala | 1 - .../facebook/CommandProcessor.scala | 72 ++++++++++--------- .../facebook/DeliveryProcessor.scala | 12 ++-- .../narrative/connectors/facebook/Main.scala | 19 +++-- .../connectors/facebook/Resources.scala | 57 +++++++-------- 15 files changed, 131 insertions(+), 142 deletions(-) diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/Resources.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/Resources.scala index f5d5489..82cd472 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/Resources.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/Resources.scala @@ -11,39 +11,31 @@ import doobie.Transactor import doobie.hikari.HikariTransactor import doobie.util.ExecutionContexts import io.narrative.common.ssm.SSMResources -import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.ember.client.EmberClientBuilder import org.http4s.client.Client -import scala.concurrent.ExecutionContext - final case class Resources( awsCredentials: AWSCredentialsProvider, - blocker: Blocker, client: Client[IO], kms: AWSKMS, - serverEC: ExecutionContext, ssm: AWSSimpleSystemsManagement, xa: Transactor[IO] ) { def resolve(value: Config.Value): IO[String] = - Resources.resolve(blocker, ssm, value) + Resources.resolve(ssm, value) } object Resources extends LazyLogging { def apply(config: Config): Resource[IO, Resources] = for { - blocker <- Resource.unit[IO] - awsCredentials = new DefaultAWSCredentialsProviderChain() - client <- BlazeClientBuilder[IO](blocker.blockingContext).resource - serverEC <- ExecutionContexts.fixedThreadPool[IO](64) - ssm <- SSMResources.ssmClient(logger.underlying, blocker, awsCredentials) + awsCredentials <- Resource.eval(IO.blocking(new DefaultAWSCredentialsProviderChain())) + client <- EmberClientBuilder.default[IO].build + ssm <- SSMResources.ssmClient(logger.underlying, awsCredentials) kms <- awsKms(awsCredentials) - xa <- transactor(blocker, ssm, config.database) + xa <- transactor(ssm, config.database) } yield Resources( awsCredentials = awsCredentials, - blocker = blocker, client = client, kms = kms, - serverEC = serverEC, ssm = ssm, xa = xa ) @@ -54,12 +46,11 @@ object Resources extends LazyLogging { )(kms => IO(kms.shutdown())) def transactor(ssm: AWSSimpleSystemsManagement, db: Config.Database): Resource[IO, Transactor[IO]] = for { - username <- Resource.eval(resolve(blocker, ssm, db.username)) - password <- Resource.eval(resolve(blocker, ssm, db.password)) - jdbcUrl <- Resource.eval(resolve(blocker, ssm, db.jdbcUrl)) + username <- Resource.eval(resolve(ssm, db.username)) + password <- Resource.eval(resolve(ssm, db.password)) + jdbcUrl <- Resource.eval(resolve(ssm, db.jdbcUrl)) connectEC <- ExecutionContexts.fixedThreadPool[IO](32) xa <- HikariTransactor.newHikariTransactor[IO]( - blocker = blocker, connectEC = connectEC, driverClassName = "org.postgresql.Driver", pass = password, @@ -73,8 +64,7 @@ object Resources extends LazyLogging { case Config.Literal(value) => IO.pure(value) case Config.SsmParam(value, encrypted) => - blocker - .blockOn(IO(ssm.getParameter(new GetParameterRequest().withName(value).withWithDecryption(encrypted)))) + IO.blocking(ssm.getParameter(new GetParameterRequest().withName(value).withWithDecryption(encrypted))) .map(_.getParameter.getValue) .attempt .map { diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/Server.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/Server.scala index 51eaee4..d3ccd62 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/Server.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/Server.scala @@ -1,6 +1,7 @@ package io.narrative.connectors.facebook import cats.effect.{IO, IOApp, Resource} +import com.comcast.ip4s.{Host, Port} import com.typesafe.scalalogging.LazyLogging import io.narrative.connectors.facebook.routes.Logging import io.narrative.connectors.facebook.routes.profiles.ProfileRoutes @@ -15,7 +16,7 @@ import io.narrative.connectors.facebook.services.{ } import io.narrative.connectors.facebook.stores.ProfileStore import org.http4s.{HttpApp, Uri} -import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.ember.server.EmberServerBuilder import org.http4s.server.Router import org.http4s.server.middleware.CORS @@ -25,10 +26,20 @@ object Server extends IOApp.Simple with LazyLogging { config <- Resource.eval(Config()) resources <- Resources(config) routes <- Resource.eval(router(config, resources)) - server <- BlazeServerBuilder[IO](resources.serverEC) - .bindHttp(config.server.port.value.toInt, "0.0.0.0") + server <- EmberServerBuilder + .default[IO] + .withHost( + Host + .fromString("0.0.0.0") + .getOrElse(throw new RuntimeException("Programming error: host 0.0.0.0 is not valid")) + ) + .withPort( + Port + .fromString(config.server.port.value) + .getOrElse(throw new RuntimeException(s"Programming error: port ${config.server.port.value} is not valid")) + ) .withHttpApp(routes) - .resource + .build } yield server server.use(_ => IO.never).void @@ -41,11 +52,11 @@ object Server extends IOApp.Simple with LazyLogging { kmsKeyId <- resources.resolve(config.kms.tokenEncryptionKeyId).map(KmsKeyId.apply) apiBaseUri <- resources.resolve(config.narrativeApi.baseUri).map(Uri.unsafeFromString) apiClient = new ApiClient(baseUri = apiBaseUri, client = resources.client) - fbClient = new FacebookClient(FacebookApp(appId, appSecret), blocker = resources.blocker) + fbClient = new FacebookClient(FacebookApp(appId, appSecret)) profileService = new ProfileService( apiClient, fbClient, - new TokenEncryptionService(resources.blocker, kmsKeyId, resources.kms), + new TokenEncryptionService(kmsKeyId, resources.kms), ProfileStore(resources.xa) ) http = CORS.policy.withAllowCredentials(false) { diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Auth.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Auth.scala index 34fb58a..3d82f42 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Auth.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Auth.scala @@ -2,7 +2,6 @@ package io.narrative.connectors.facebook.routes import cats.data.{Kleisli, OptionT} import cats.effect.IO -import cats.syntax.applicativeError._ import com.typesafe.scalalogging.LazyLogging import io.narrative.connectors.facebook.services.BearerToken import org.http4s.client.UnexpectedStatus diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Logging.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Logging.scala index 11cdf3c..90f9950 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Logging.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/Logging.scala @@ -28,7 +28,7 @@ object Logging extends LazyLogging { resp <- service(req) _ <- resp.status.code match { case code if code >= 400 => - org.http4s.internal.Logger.logMessageWithBodyText[IO, Response[IO]](resp)( + org.http4s.internal.Logger.logMessageWithBodyText[IO](resp)( logHeaders = true, logBodyText = truncateBody(32768) _ )(logWarn) @@ -40,5 +40,5 @@ object Logging extends LazyLogging { } private def truncateBody(maxBytes: Int)(bytes: fs2.Stream[IO, Byte]): Option[IO[String]] = - bytes.take(maxBytes.toLong).through(fs2.text.utf8Decode).compile.last.map(_.getOrElse("")).some + bytes.take(maxBytes.toLong).through(fs2.text.utf8.decode).compile.last.map(_.getOrElse("")).some } diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/profiles/ProfileRoutes.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/profiles/ProfileRoutes.scala index ae89bcc..9f11d2e 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/profiles/ProfileRoutes.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/profiles/ProfileRoutes.scala @@ -11,9 +11,7 @@ import org.http4s._ import org.http4s.circe.CirceEntityCodec._ import org.http4s.dsl.io._ -class ProfileRoutes(service: ProfileService.Ops[IO])(implicit - contextShift: ContextShift[IO] -) extends LazyLogging { +class ProfileRoutes(service: ProfileService.Ops[IO]) extends LazyLogging { val routes: HttpRoutes[IO] = Auth.auth { case GET -> Root as auth => profiles(auth) diff --git a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/tokens/TokenRoutes.scala b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/tokens/TokenRoutes.scala index 90ee84d..3faa26c 100644 --- a/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/tokens/TokenRoutes.scala +++ b/backend/api/src/main/scala/io/narrative/connectors/facebook/routes/tokens/TokenRoutes.scala @@ -8,7 +8,7 @@ import org.http4s._ import org.http4s.circe.CirceEntityCodec._ import org.http4s.dsl.io._ -class TokenRoutes(service: ProfileService.Ops[IO])(implicit contextShift: ContextShift[IO]) extends LazyLogging { +class TokenRoutes(service: ProfileService.Ops[IO]) extends LazyLogging { val routes: HttpRoutes[IO] = Auth.noauth { case req @ POST -> Root / "metadata" => tokenMeta(req) diff --git a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/ApiClient.scala b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/ApiClient.scala index 9ca30b3..19c9652 100644 --- a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/ApiClient.scala +++ b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/ApiClient.scala @@ -2,8 +2,7 @@ package io.narrative.connectors.facebook.services import cats.{Eq, Show} import cats.effect.IO -import cats.syntax.applicativeError._ -import cats.syntax.show._ +import cats.implicits._ import io.circe.{Decoder, Encoder} import io.circe.generic.extras.semiauto.{deriveConfiguredDecoder, deriveConfiguredEncoder} import org.http4s.circe.CirceEntityCodec._ @@ -13,7 +12,7 @@ import org.http4s.headers.Authorization import io.narrative.connectors.facebook.domain.Profile /** todo(mbabic) scaladocs */ -class ApiClient(baseUri: Uri, client: Client[IO])(implicit contextShift: ContextShift[IO]) extends ApiClient.Ops[IO] { +class ApiClient(baseUri: Uri, client: Client[IO]) extends ApiClient.Ops[IO] { import ApiClient._ override def company(auth: BearerToken): IO[ApiCompany] = { diff --git a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/AppApiClient.scala b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/AppApiClient.scala index 58f282a..c6e93dc 100644 --- a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/AppApiClient.scala +++ b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/AppApiClient.scala @@ -2,8 +2,7 @@ package io.narrative.connectors.facebook.services import cats.{Eq, Show} import cats.effect.IO -import cats.syntax.applicativeError._ -import cats.syntax.option._ +import cats.implicits._ import com.typesafe.scalalogging.LazyLogging import fs2.Stream import io.circe.{Decoder, Encoder} diff --git a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/FacebookClient.scala b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/FacebookClient.scala index 503aaf4..33e09cb 100644 --- a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/FacebookClient.scala +++ b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/FacebookClient.scala @@ -2,9 +2,7 @@ package io.narrative.connectors.facebook.services import cats.data.NonEmptyList import cats.effect.IO -import cats.instances.list._ -import cats.syntax.show._ -import cats.syntax.traverse._ +import cats.implicits._ import com.facebook.ads.{sdk => fb} import com.google.gson.{JsonArray, JsonObject, JsonPrimitive} import com.typesafe.scalalogging.LazyLogging @@ -17,14 +15,9 @@ import java.time.Instant import scala.jdk.CollectionConverters._ import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import cats.effect.Temporal /** A Facebook API wrapper. */ -class FacebookClient(app: FacebookApp, blocker: Blocker)(implicit - cs: ContextShift[IO], - timer: Temporal[IO] -) extends FacebookClient.Ops[IO] - with LazyLogging { +class FacebookClient(app: FacebookApp) extends FacebookClient.Ops[IO] with LazyLogging { import FacebookClient._ // Facebook has special support for constructing tokens that allow an app to perform actions by concatenating the @@ -239,16 +232,14 @@ class FacebookClient(app: FacebookApp, blocker: Blocker)(implicit private def runIO[A](apiCall: => A): IO[A] = retryingOnSomeErrors(retryPolicy, shouldRetry, logError) { - blocker.blockOn( - IO(apiCall) - .timeout(HttpTimeout) - .handleErrorWith { - // https://developers.facebook.com/docs/graph-api/using-graph-api/error-handling/ - case e: fb.APIException.FailedRequestException if getErrorCode(e).contains(190) => - IO.raiseError(InvalidAccessToken) - case t => IO.raiseError(t) - } - ) + IO.blocking(apiCall) + .timeout(HttpTimeout) + .handleErrorWith { + // https://developers.facebook.com/docs/graph-api/using-graph-api/error-handling/ + case e: fb.APIException.FailedRequestException if getErrorCode(e).contains(190) => + IO.raiseError(InvalidAccessToken) + case t => IO.raiseError(t) + } } } @@ -373,10 +364,10 @@ object FacebookClient extends LazyLogging { payload } - private def shouldRetry(t: Throwable): Boolean = t match { + private def shouldRetry(t: Throwable): IO[Boolean] = t match { case failedRequest: fb.APIException.FailedRequestException => - getErrorCode(failedRequest).exists(RetryableErrorCodes.contains) - case _ => false + getErrorCode(failedRequest).exists(RetryableErrorCodes.contains).pure[IO] + case _ => false.pure[IO] } private def logError(t: Throwable, retryDetails: RetryDetails): IO[Unit] = diff --git a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/TokenEncryptionService.scala b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/TokenEncryptionService.scala index 3abdcf3..5122f9d 100644 --- a/backend/services/src/main/scala/io/narrative/connectors/facebook/services/TokenEncryptionService.scala +++ b/backend/services/src/main/scala/io/narrative/connectors/facebook/services/TokenEncryptionService.scala @@ -8,8 +8,7 @@ import io.narrative.connectors.facebook.domain.Token import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -class TokenEncryptionService(blocker: Blocker, keyId: KmsKeyId, kms: AWSKMS)(implicit contextShift: ContextShift[IO]) - extends TokenEncryptionService.Ops[IO] { +class TokenEncryptionService(keyId: KmsKeyId, kms: AWSKMS) extends TokenEncryptionService.Ops[IO] { override def decrypt(token: Token.Encrypted): IO[FacebookToken] = { val req = new DecryptRequest().withKeyId(keyId.value).withCiphertextBlob(ByteBuffer.wrap(token.value)) runIO(new String(kms.decrypt(req).getPlaintext.array(), StandardCharsets.UTF_8)).map(FacebookToken.apply) @@ -22,7 +21,7 @@ class TokenEncryptionService(blocker: Blocker, keyId: KmsKeyId, kms: AWSKMS)(imp runIO(kms.encrypt(req).getCiphertextBlob.array()).map(Token.Encrypted.apply) } - private def runIO[A](f: => A): IO[A] = blocker.blockOn(IO(f)) + private def runIO[A](f: => A): IO[A] = IO.blocking(f) } object TokenEncryptionService { diff --git a/backend/stores/src/main/scala/io/narrative/connectors/facebook/stores/CommandStore.scala b/backend/stores/src/main/scala/io/narrative/connectors/facebook/stores/CommandStore.scala index c6e736c..603916f 100644 --- a/backend/stores/src/main/scala/io/narrative/connectors/facebook/stores/CommandStore.scala +++ b/backend/stores/src/main/scala/io/narrative/connectors/facebook/stores/CommandStore.scala @@ -6,7 +6,6 @@ import cats.syntax.functor._ import doobie.ConnectionIO import doobie.implicits._ import doobie.postgres.implicits._ -import doobie.implicits.legacy.instant._ import doobie.Fragments.set import doobie.util.transactor.Transactor import io.circe.syntax._ diff --git a/backend/worker/src/main/scala/io/narrative/connectors/facebook/CommandProcessor.scala b/backend/worker/src/main/scala/io/narrative/connectors/facebook/CommandProcessor.scala index dc07ff5..fbde079 100644 --- a/backend/worker/src/main/scala/io/narrative/connectors/facebook/CommandProcessor.scala +++ b/backend/worker/src/main/scala/io/narrative/connectors/facebook/CommandProcessor.scala @@ -3,12 +3,11 @@ package io.narrative.connectors.facebook import cats.data.EitherT import cats.effect.IO import cats.implicits._ +import cats.~> import com.typesafe.scalalogging.LazyLogging import doobie.ConnectionIO -import doobie.implicits._ import io.circe.JsonObject import io.circe.syntax.EncoderOps - import io.narrative.connectors.api.connections.ConnectionsApi import io.narrative.connectors.api.events.EventsApi.DeliveryEvent import io.narrative.connectors.api.files.BackwardsCompatibleFilesApi @@ -34,28 +33,32 @@ class CommandProcessor( EitherT.fromEither[IO](jsonObject.asJson.as[Profile.QuickSettings]).rethrowT }.sequence - override def process(event: CommandProcessorEvent): ConnectionIO[CommandProcessor.Result] = { + override def process( + event: CommandProcessorEvent, + toConnectionIO: IO ~> ConnectionIO + ): ConnectionIO[CommandProcessor.Result] = { for { _ <- doobie.free.connection.delay(logger.info(s"processing ${event}")) result <- event match { case SnapshotAppendedCPEvent(metadata, snapshotAppended) => for { - connection <- connectionsApi.connection(snapshotAppended.connectionId).to[ConnectionIO] - quickSettings <- extractQuickSettings(connection.quickSettings).to[ConnectionIO] + connection <- toConnectionIO(connectionsApi.connection(snapshotAppended.connectionId)) + quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings)) - settings <- settingsService - .getOrCreate( - quickSettings, - Settings.Id - .fromConnection( - Profile.Id.apply(connection.profileId), - ConnectionId.apply(snapshotAppended.connectionId) - ), - Audience.Name(s"Connection ${ConnectionId.apply(snapshotAppended.connectionId).show} Audience"), - Profile.Id.apply(connection.profileId) - ) - .to[ConnectionIO] - files <- filesApi.getFiles(event.metadata.revision.value).to[ConnectionIO] + settings <- toConnectionIO( + settingsService + .getOrCreate( + quickSettings, + Settings.Id + .fromConnection( + Profile.Id.apply(connection.profileId), + ConnectionId.apply(snapshotAppended.connectionId) + ), + Audience.Name(s"Connection ${ConnectionId.apply(snapshotAppended.connectionId).show} Audience"), + Profile.Id.apply(connection.profileId) + ) + ) + files <- toConnectionIO(filesApi.getFiles(event.metadata.revision.value)) newJobs = files.map { resp => Job( eventRevision = Revision.apply(event.metadata.revision.value), @@ -78,22 +81,23 @@ class CommandProcessor( } yield result case SubscriptionDeliveryCPEvent(metadata, subscriptionDelivery) => for { - quickSettings <- extractQuickSettings(subscriptionDelivery.quickSettings).to[ConnectionIO] - settings <- settingsService - .getOrCreate( - quickSettings, - Settings.Id - .fromSubscription( - Profile.Id.apply(subscriptionDelivery.profileId), - SubscriptionId.apply(subscriptionDelivery.subscriptionId) + quickSettings <- toConnectionIO(extractQuickSettings(subscriptionDelivery.quickSettings)) + settings <- toConnectionIO( + settingsService + .getOrCreate( + quickSettings, + Settings.Id + .fromSubscription( + Profile.Id.apply(subscriptionDelivery.profileId), + SubscriptionId.apply(subscriptionDelivery.subscriptionId) + ), + Audience.Name( + s"Subscription ${SubscriptionId.apply(subscriptionDelivery.subscriptionId).show} Audience" ), - Audience.Name( - s"Subscription ${SubscriptionId.apply(subscriptionDelivery.subscriptionId).show} Audience" - ), - Profile.Id.apply(subscriptionDelivery.profileId) - ) - .to[ConnectionIO] - files <- filesApi.getFiles(metadata.revision.value).to[ConnectionIO] + Profile.Id.apply(subscriptionDelivery.profileId) + ) + ) + files <- toConnectionIO(filesApi.getFiles(metadata.revision.value)) newJobs = files.map { resp => Job( eventRevision = Revision.apply(metadata.revision.value), @@ -137,7 +141,7 @@ object CommandProcessor { trait ReadOps[F[_]] {} trait WriteOps[F[_]] { - def process(event: CommandProcessorEvent): F[Result] + def process(event: CommandProcessorEvent, toConnectionIO: IO ~> ConnectionIO): F[Result] } trait Ops[F[_]] extends ReadOps[F] with WriteOps[F] diff --git a/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala b/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala index cb92f34..7232c97 100644 --- a/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala +++ b/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala @@ -4,8 +4,8 @@ import cats.data.OptionT import cats.effect.IO import cats.syntax.show._ import com.typesafe.scalalogging.LazyLogging +import fs2.io.file.Flags import io.circe.parser.parse - import io.narrative.connectors.api.connections.ConnectionsApi import io.narrative.connectors.api.events.EventsApi.DeliveryEvent import io.narrative.connectors.api.events.EventsApi.DeliveryEvent.{SnapshotAppended, SubscriptionDelivery} @@ -30,8 +30,7 @@ class DeliveryProcessor( encryption: TokenEncryptionService.Ops[IO], fb: FacebookClient.Ops[IO], profileStore: ProfileStore.Ops[IO], - parquetTransformer: ParquetTransformer, - blocker: Blocker + parquetTransformer: ParquetTransformer ) extends DeliveryProcessor.Ops[IO] with LazyLogging { @@ -85,8 +84,9 @@ class DeliveryProcessor( } fileResource.use { path => fs2.io.file - .readAll[IO](path, blocker, 65536) - .through(fs2.text.utf8Decode) + .Files[IO] + .readAll(fs2.io.file.Path.fromNioPath(path), 65536, Flags.Read) + .through(fs2.text.utf8.decode) .through(fs2.text.lines) .map(parse(_).toOption.map(parseAudience)) .unNone @@ -130,7 +130,7 @@ object DeliveryProcessor { trait ReadOps[F[_]] {} trait WriteOps[F[_]] { - def processIfDeliverable(job: Job)(implicit cs: ContextShift[F]): F[Unit] + def processIfDeliverable(job: Job): F[Unit] } trait Ops[F[_]] extends ReadOps[F] with WriteOps[F] diff --git a/backend/worker/src/main/scala/io/narrative/connectors/facebook/Main.scala b/backend/worker/src/main/scala/io/narrative/connectors/facebook/Main.scala index b78b1a1..09931f9 100644 --- a/backend/worker/src/main/scala/io/narrative/connectors/facebook/Main.scala +++ b/backend/worker/src/main/scala/io/narrative/connectors/facebook/Main.scala @@ -3,13 +3,15 @@ package io.narrative.connectors.facebook import cats.effect.{IO, IOApp, Resource} import cats.implicits.catsSyntaxTuple2Parallel import com.typesafe.scalalogging.LazyLogging -import doobie.ConnectionIO - +import doobie.{ConnectionIO, WeakAsync} import io.narrative.connectors.api.events.EventsApi.DeliveryEvent import io.narrative.connectors.facebook.CommandProcessor.{SnapshotAppendedCPEvent, SubscriptionDeliveryCPEvent} import io.narrative.connectors.queue.QueueConsumer +import org.typelevel.log4cats.LoggerFactory +import org.typelevel.log4cats.slf4j.Slf4jFactory object Main extends IOApp.Simple with LazyLogging { + private implicit val loggingConnectionIO: LoggerFactory[ConnectionIO] = Slf4jFactory.create[ConnectionIO] val parallelizationFactor = 1 override def run: IO[Unit] = { val worker = for { @@ -23,21 +25,24 @@ object Main extends IOApp.Simple with LazyLogging { // Kick off command consumption and multiple job processors in parallel. // See https://typelevel.org/cats-effect/docs/2.x/datatypes/io#parallelism - private def run(resources: Resources): IO[Unit] = + private def run(resources: Resources): IO[Unit] = WeakAsync.liftK[IO, ConnectionIO].use { toConnectionIO => ( resources.eventConsumer.consume(event => event.payload match { case sd: DeliveryEvent.SubscriptionDelivery => - resources.eventProcessor.process(SubscriptionDeliveryCPEvent(event.metadata, sd)).map(_ => ()) + resources.eventProcessor + .process(SubscriptionDeliveryCPEvent(event.metadata, sd), toConnectionIO) + .map(_ => ()) case sa: DeliveryEvent.SnapshotAppended => - resources.eventProcessor.process(SnapshotAppendedCPEvent(event.metadata, sa)).map(_ => ()) + resources.eventProcessor.process(SnapshotAppendedCPEvent(event.metadata, sa), toConnectionIO).map(_ => ()) case DeliveryEvent.ConnectionCreated(connectionId) => - doobie.free.connection.delay(logger.info(s"Connection-created event [$connectionId] ignored")) + loggingConnectionIO.getLogger.info(s"Connection-created event [$connectionId] ignored") } ), QueueConsumer.parallelize(resources.queueStore, resources.transactor, parallelizationFactor) { job => - resources.deliveryProcessor.processIfDeliverable(job).to[ConnectionIO] + toConnectionIO(resources.deliveryProcessor.processIfDeliverable(job)) } ).parMapN((_, _) => ()) + } } diff --git a/backend/worker/src/main/scala/io/narrative/connectors/facebook/Resources.scala b/backend/worker/src/main/scala/io/narrative/connectors/facebook/Resources.scala index 79f83fa..7d2e17b 100644 --- a/backend/worker/src/main/scala/io/narrative/connectors/facebook/Resources.scala +++ b/backend/worker/src/main/scala/io/narrative/connectors/facebook/Resources.scala @@ -24,7 +24,7 @@ import io.narrative.connectors.facebook.stores.{CommandStore, ProfileStore, Sett import io.narrative.connectors.queue.QueueStore import io.narrative.connectors.spark.{ParquetTransformer, SparkSessions} import io.narrative.microframework.config.Stage -import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.ember.client.EmberClientBuilder import cats.effect.Temporal final case class Resources( @@ -40,17 +40,16 @@ object Resources extends LazyLogging { timer: Temporal[IO] ): Resource[IO, Resources] = for { - blocker <- Resource.unit[IO] - awsCredentials = new DefaultAWSCredentialsProviderChain() - ssm <- SSMResources.ssmClient(logger.underlying, blocker, awsCredentials) - xa <- transactor(blocker, ssm, config.database, parallelizationFactor) - api <- baseAppApiClient(blocker, config, ssm) - encryption <- encryptionService(awsCredentials, blocker, config, ssm) - fb <- Resource.eval(facebookClient(blocker, config, ssm)) - sparkSessions <- Resource.eval(SparkSessions.default[IO](blocker)) + awsCredentials <- Resource.eval(IO.blocking(new DefaultAWSCredentialsProviderChain())) + ssm <- SSMResources.ssmClient(logger.underlying, awsCredentials) + xa <- transactor(ssm, config.database, parallelizationFactor) + api <- baseAppApiClient(config, ssm) + encryption <- encryptionService(awsCredentials, config, ssm) + fb <- Resource.eval(facebookClient(config, ssm)) + sparkSessions <- Resource.eval(SparkSessions.default[IO]) sparkSession <- Resource.eval(sparkSessions.getSparkSession) - parquetTransformer = new ParquetTransformer(sparkSession, blocker) + parquetTransformer = new ParquetTransformer(sparkSession) commandStore = new CommandStore() profileStore = ProfileStore(xa) @@ -63,7 +62,7 @@ object Resources extends LazyLogging { filesApiV1 = new FilesApiV1(api) filesApiV2 = new FilesApiV2(api) eventsApi = new EventsApi(api) - filesApi = new BackwardsCompatibleFilesApi(blocker, eventsApi, filesApiV1, filesApiV2) + filesApi = new BackwardsCompatibleFilesApi(eventsApi, filesApiV1, filesApiV2) commandProcessor = new CommandProcessor(commandStore, jobStore, settingsService, connectionsApi, filesApi) deliveryProcessor = new DeliveryProcessor( @@ -74,8 +73,7 @@ object Resources extends LazyLogging { encryption, fb, profileStore, - parquetTransformer, - blocker + parquetTransformer ) eventConsumer = new EventConsumer(eventsApi, revisionStore, xa) @@ -90,9 +88,9 @@ object Resources extends LazyLogging { def baseAppApiClient(config: Config, ssm: AWSSimpleSystemsManagement): Resource[IO, BaseAppApiClient.Ops[IO]] = for { - blazeClient <- BlazeClientBuilder[IO](scala.concurrent.ExecutionContext.Implicits.global).resource - id <- Resource.eval(resolve(blocker, ssm, config.narrativeApi.clientId)).map(ClientId.apply) - secret <- Resource.eval(resolve(blocker, ssm, config.narrativeApi.clientSecret)).map(ClientSecret.apply) + blazeClient <- EmberClientBuilder.default[IO].build + id <- Resource.eval(resolve(ssm, config.narrativeApi.clientId)).map(ClientId.apply) + secret <- Resource.eval(resolve(ssm, config.narrativeApi.clientSecret)).map(ClientSecret.apply) narrativeApiClient <- config.stage match { case Stage.Prod => BaseAppApiClient.production(blazeClient, id.value, secret.value) @@ -112,26 +110,24 @@ object Resources extends LazyLogging { ssm: AWSSimpleSystemsManagement ): Resource[IO, TokenEncryptionService.Ops[IO]] = for { kms <- awsKms(awsCredentials) - keyId <- Resource.eval(resolve(blocker, ssm, config.kms.tokenEncryptionKeyId)).map(KmsKeyId.apply) - } yield new TokenEncryptionService(blocker, keyId, kms) + keyId <- Resource.eval(resolve(ssm, config.kms.tokenEncryptionKeyId)).map(KmsKeyId.apply) + } yield new TokenEncryptionService(keyId, kms) - private def facebookClient(config: Config, ssm: AWSSimpleSystemsManagement)(implicit - timer: Temporal[IO] - ): IO[FacebookClient.Ops[IO]] = for { - id <- resolve(blocker, ssm, config.facebook.appId).map(FacebookApp.Id.apply) - secret <- resolve(blocker, ssm, config.facebook.appSecret).map(FacebookApp.Secret.apply) - } yield new FacebookClient(FacebookApp(id, secret), blocker) + private def facebookClient(config: Config, ssm: AWSSimpleSystemsManagement): IO[FacebookClient.Ops[IO]] = for { + id <- resolve(ssm, config.facebook.appId).map(FacebookApp.Id.apply) + secret <- resolve(ssm, config.facebook.appSecret).map(FacebookApp.Secret.apply) + } yield new FacebookClient(FacebookApp(id, secret)) - private def transactor(ssm: AWSSimpleSystemsManagement, + private def transactor( + ssm: AWSSimpleSystemsManagement, db: Config.Database, parallelizationFactor: Int ): Resource[IO, Transactor[IO]] = for { - username <- Resource.eval(resolve(blocker, ssm, db.username)) - password <- Resource.eval(resolve(blocker, ssm, db.password)) - jdbcUrl <- Resource.eval(resolve(blocker, ssm, db.jdbcUrl)) + username <- Resource.eval(resolve(ssm, db.username)) + password <- Resource.eval(resolve(ssm, db.password)) + jdbcUrl <- Resource.eval(resolve(ssm, db.jdbcUrl)) connectEC <- ExecutionContexts.fixedThreadPool[IO](32) xa <- HikariTransactor.newHikariTransactor[IO]( - blocker = blocker, connectEC = connectEC, driverClassName = "org.postgresql.Driver", pass = password, @@ -148,8 +144,7 @@ object Resources extends LazyLogging { case Config.Literal(value) => IO.pure(value) case Config.SsmParam(value, encrypted) => - blocker - .blockOn(IO(ssm.getParameter(new GetParameterRequest().withName(value).withWithDecryption(encrypted)))) + IO.blocking(ssm.getParameter(new GetParameterRequest().withName(value).withWithDecryption(encrypted))) .map(_.getParameter.getValue) .attempt .map {