Skip to content

Commit

Permalink
Upgrade to Cats Effect 3 ecosystem (close #837)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Dec 6, 2023
1 parent ccdc185 commit 411fd35
Show file tree
Hide file tree
Showing 166 changed files with 1,696 additions and 1,679 deletions.
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ lazy val pubsub = project
.settings(libraryDependencies ++= pubsubDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "compile->compile;test->test")
.dependsOn(commonFs2 % "test->test;compile->compile")
.dependsOn(gcpUtils % "compile->compile")

Expand All @@ -80,6 +81,7 @@ lazy val pubsubDistroless = project
.settings(libraryDependencies ++= pubsubDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "compile->compile;test->test")
.dependsOn(commonFs2 % "test->test;compile->compile")
.dependsOn(gcpUtils % "compile->compile")

Expand All @@ -90,6 +92,7 @@ lazy val kinesis = project
.settings(libraryDependencies ++= kinesisDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "compile->compile;test->test")
.dependsOn(commonFs2 % "test->test;compile->compile")
.dependsOn(awsUtils % "compile->compile")

Expand All @@ -105,6 +108,7 @@ lazy val kinesisDistroless = project
))
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "compile->compile;test->test")
.dependsOn(commonFs2 % "compile->compile;it->it")
.dependsOn(awsUtils % "compile->compile")
.settings(Defaults.itSettings)
Expand All @@ -124,6 +128,7 @@ lazy val kafka = project
.settings(Defaults.itSettings)
.configs(IntegrationTest)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "compile->compile;test->test")
.dependsOn(commonFs2 % "compile->compile;test->test;it->it")
.dependsOn(awsUtils % "compile->compile")
.dependsOn(gcpUtils % "compile->compile")
Expand All @@ -137,6 +142,7 @@ lazy val kafkaDistroless = project
.settings(libraryDependencies ++= kafkaDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "compile->compile;test->test")
.dependsOn(commonFs2)
.dependsOn(awsUtils % "compile->compile")
.dependsOn(gcpUtils % "compile->compile")
Expand All @@ -149,6 +155,7 @@ lazy val nsq = project
.settings(libraryDependencies ++= nsqDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "compile->compile;test->test")
.dependsOn(commonFs2 % "compile->compile;test->test")
.dependsOn(awsUtils % "compile->compile")
.dependsOn(gcpUtils % "compile->compile")
Expand All @@ -166,6 +173,7 @@ lazy val nsqDistroless = project
))
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(common % "compile->compile;test->test")
.dependsOn(commonFs2 % "compile->compile;it->it")
.dependsOn(awsUtils % "compile->compile")
.dependsOn(gcpUtils % "compile->compile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.aws
import java.net.URI

import cats.implicits._
import cats.effect.{ConcurrentEffect, Resource, Timer}
import cats.effect.kernel.{Async, Resource, Sync}

import fs2.Stream

Expand All @@ -31,9 +31,9 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.{Client, Retr

object S3Client {

def mk[F[_]: ConcurrentEffect: Timer]: Resource[F, Client[F]] =
def mk[F[_]: Async]: Resource[F, Client[F]] =
for {
s3Client <- Resource.fromAutoCloseable(ConcurrentEffect[F].delay(S3AsyncClient.builder().region(getRegion).build()))
s3Client <- Resource.fromAutoCloseable(Sync[F].delay(S3AsyncClient.builder().region(getRegion).build()))
store <- Resource.eval(S3Store.builder[F](s3Client).build.toEither.leftMap(_.head).pure[F].rethrow)
} yield new Client[F] {
def canDownload(uri: URI): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@
*/
package com.snowplowanalytics.snowplow.enrich.azure

import java.net.URI

import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.storage.blob.{BlobServiceClientBuilder, BlobUrlParts}

import blobstore.azure.AzureStore
import blobstore.url.exception.{AuthorityParseError, MultipleUrlValidationException, Throwables}
import blobstore.url.{Authority, Path, Url}

import cats.data.Validated.{Invalid, Valid}
import cats.data.ValidatedNec
import cats.effect._
import cats.implicits._
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.storage.blob.{BlobServiceClientBuilder, BlobUrlParts}

import cats.effect.kernel.{Async, Resource, Sync}

import fs2.Stream
import java.net.URI

import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

object AzureStorageClient {

def mk[F[_]: ConcurrentEffect](storageAccountName: String): Resource[F, Client[F]] =
def mk[F[_]: Async](storageAccountName: String): Resource[F, Client[F]] =
for {
store <- createStore(storageAccountName)
} yield new Client[F] {
Expand All @@ -41,10 +47,10 @@ object AzureStorageClient {
}
}

private def createStore[F[_]: ConcurrentEffect: Async](storageAccountName: String): Resource[F, AzureStore[F]] =
private def createStore[F[_]: Async](storageAccountName: String): Resource[F, AzureStore[F]] =
for {
client <- Resource.eval {
ConcurrentEffect[F].delay {
Sync[F].delay {
val builder = new BlobServiceClientBuilder().credential(new DefaultAzureCredentialBuilder().build)
val storageEndpoint = createStorageEndpoint(storageAccountName)
builder.endpoint(storageEndpoint).buildAsyncClient()
Expand All @@ -54,7 +60,7 @@ object AzureStorageClient {
.builder[F](client)
.build
.fold(
errors => Resource.eval(ConcurrentEffect[F].raiseError(errors.reduce(Throwables.collapsingSemigroup))),
errors => Resource.eval(Sync[F].raiseError(errors.reduce(Throwables.collapsingSemigroup))),
s => Resource.pure[F, AzureStore[F]](s)
)
} yield store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.gcp
import java.net.URI

import cats.implicits._
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Timer}
import cats.effect.{Async, Sync}

import fs2.Stream

Expand All @@ -29,10 +29,10 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.{Client, Retr

object GcsClient {

def mk[F[_]: ConcurrentEffect: ContextShift: Timer](blocker: Blocker): F[Client[F]] =
ConcurrentEffect[F].delay(StorageOptions.getDefaultInstance.getService).map { service =>
def mk[F[_]: Async]: F[Client[F]] =
Sync[F].delay(StorageOptions.getDefaultInstance.getService).map { service =>
new Client[F] {
val store = GcsStore.builder(service, blocker).unsafe
val store = GcsStore.builder(service).unsafe

def canDownload(uri: URI): Boolean = uri.getScheme == "gs"

Expand Down
Loading

0 comments on commit 411fd35

Please sign in to comment.