Skip to content

Commit

Permalink
feature: transactions with single-threaded execution context (#685)
Browse files Browse the repository at this point in the history
  • Loading branch information
gvolpe authored Apr 20, 2022
1 parent ebaa90f commit 4f5ae56
Show file tree
Hide file tree
Showing 31 changed files with 842 additions and 1,385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ sealed abstract case class RedisClient private (underlying: JRedisClient, uri: R

object RedisClient {

private[redis4cats] def acquireAndRelease[F[_]: Apply: FutureLift: Log: RedisExecutor](
private[redis4cats] def acquireAndRelease[F[_]: Apply: FutureLift: Log](
uri: => RedisURI,
opts: ClientOptions,
config: Redis4CatsConfig
): (F[RedisClient], RedisClient => F[Unit]) = {
val acquire: F[RedisClient] = RedisExecutor[F].lift {
val acquire: F[RedisClient] = FutureLift[F].delay {
val jClient: JRedisClient = JRedisClient.create(uri.underlying)
jClient.setOptions(opts)
new RedisClient(jClient, uri) {}
Expand All @@ -44,25 +44,23 @@ object RedisClient {
Log[F].info(s"Releasing Redis connection: $uri") *>
FutureLift[F]
.liftCompletableFuture(
RedisExecutor[F].lift(
client.underlying.shutdownAsync(
config.shutdown.quietPeriod.toNanos,
config.shutdown.timeout.toNanos,
TimeUnit.NANOSECONDS
)
client.underlying.shutdownAsync(
config.shutdown.quietPeriod.toNanos,
config.shutdown.timeout.toNanos,
TimeUnit.NANOSECONDS
)
)
.void

(acquire, release)
}

private[redis4cats] def acquireAndReleaseWithoutUri[F[_]: FutureLift: Log: MonadThrow: RedisExecutor](
private[redis4cats] def acquireAndReleaseWithoutUri[F[_]: FutureLift: Log: MonadThrow](
opts: ClientOptions,
config: Redis4CatsConfig
): F[(F[RedisClient], RedisClient => F[Unit])] =
RedisExecutor[F]
.lift(RedisURI.fromUnderlying(new JRedisURI()))
FutureLift[F]
.delay(RedisURI.fromUnderlying(new JRedisURI()))
.map(uri => acquireAndRelease(uri, opts, config))

class RedisClientPartiallyApplied[F[_]: MkRedis: MonadThrow] {
Expand Down Expand Up @@ -142,11 +140,10 @@ object RedisClient {
uri: => RedisURI,
opts: ClientOptions,
config: Redis4CatsConfig = Redis4CatsConfig()
): Resource[F, RedisClient] =
MkRedis[F].newExecutor.flatMap { implicit ec =>
val (acquire, release) = acquireAndRelease(uri, opts, config)
Resource.make(acquire)(release)
}
): Resource[F, RedisClient] = {
val (acquire, release) = acquireAndRelease(uri, opts, config)
Resource.make(acquire)(release)
}
}

def apply[F[_]: MkRedis: MonadThrow]: RedisClientPartiallyApplied[F] = new RedisClientPartiallyApplied[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,39 @@ sealed abstract case class RedisClusterClient private (underlying: JClusterClien

object RedisClusterClient {

private[redis4cats] def acquireAndRelease[F[_]: FlatMap: FutureLift: Log: RedisExecutor](
private[redis4cats] def acquireAndRelease[F[_]: FlatMap: FutureLift: Log](
config: Redis4CatsConfig,
uri: RedisURI*
): (F[RedisClusterClient], RedisClusterClient => F[Unit]) = {

val acquire: F[RedisClusterClient] =
Log[F].info(s"Acquire Redis Cluster client") *>
RedisExecutor[F]
.lift(JClusterClient.create(uri.map(_.underlying).asJava))
FutureLift[F]
.delay(JClusterClient.create(uri.map(_.underlying).asJava))
.flatTap(initializeClusterTopology[F](_, config.topologyViewRefreshStrategy, config.nodeFilter))
.map(new RedisClusterClient(_) {})

val release: RedisClusterClient => F[Unit] = client =>
Log[F].info(s"Releasing Redis Cluster client: ${client.underlying}") *>
FutureLift[F]
.liftCompletableFuture(
RedisExecutor[F].lift(
client.underlying.shutdownAsync(
config.shutdown.quietPeriod.toNanos,
config.shutdown.timeout.toNanos,
TimeUnit.NANOSECONDS
)
client.underlying.shutdownAsync(
config.shutdown.quietPeriod.toNanos,
config.shutdown.timeout.toNanos,
TimeUnit.NANOSECONDS
)
)
.void

(acquire, release)
}

private[redis4cats] def initializeClusterTopology[F[_]: Functor: RedisExecutor](
private[redis4cats] def initializeClusterTopology[F[_]: Functor: FutureLift](
client: JClusterClient,
topologyViewRefreshStrategy: TopologyViewRefreshStrategy,
nodeFilter: RedisClusterNode => Boolean
): F[Unit] =
RedisExecutor[F].lift {
FutureLift[F].delay {
topologyViewRefreshStrategy match {
case NoRefresh =>
client.setOptions(
Expand Down Expand Up @@ -122,14 +120,13 @@ object RedisClusterClient {
def configured[F[_]: FlatMap: MkRedis](
config: Redis4CatsConfig,
uri: RedisURI*
): Resource[F, RedisClusterClient] =
MkRedis[F].newExecutor.flatMap { implicit redisExecutor =>
implicit val fl: FutureLift[F] = MkRedis[F].futureLift
implicit val log: Log[F] = MkRedis[F].log
): Resource[F, RedisClusterClient] = {
implicit val fl: FutureLift[F] = MkRedis[F].futureLift
implicit val log: Log[F] = MkRedis[F].log

val (acquire, release) = acquireAndRelease(config, uri: _*)
Resource.make(acquire)(release)
}
val (acquire, release) = acquireAndRelease(config, uri: _*)
Resource.make(acquire)(release)
}

def fromUnderlying(underlying: JClusterClient): RedisClusterClient =
new RedisClusterClient(underlying) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import cats.{ ApplicativeThrow, MonadThrow }
import cats.effect.kernel.Async
import cats.syntax.all._
import dev.profunktor.redis4cats.data.NodeId
import dev.profunktor.redis4cats.effect.{ FutureLift, RedisExecutor }
import dev.profunktor.redis4cats.effect.FutureLift
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.api.sync.{ RedisCommands => RedisSyncCommands }
Expand All @@ -43,41 +43,36 @@ private[redis4cats] trait RedisConnection[F[_], K, V] {
def liftK[G[_]: Async]: RedisConnection[G, K, V]
}

private[redis4cats] class RedisStatefulConnection[F[_]: ApplicativeThrow: FutureLift: RedisExecutor, K, V](
private[redis4cats] class RedisStatefulConnection[F[_]: ApplicativeThrow: FutureLift, K, V](
conn: StatefulRedisConnection[K, V]
) extends RedisConnection[F, K, V] {
def sync: F[RedisSyncCommands[K, V]] = RedisExecutor[F].delay(conn.sync())
def sync: F[RedisSyncCommands[K, V]] = FutureLift[F].delay(conn.sync())
def clusterSync: F[RedisClusterSyncCommands[K, V]] =
OperationNotSupported("Running in a single node").raiseError
def async: F[RedisAsyncCommands[K, V]] = RedisExecutor[F].delay(conn.async())
def async: F[RedisAsyncCommands[K, V]] = FutureLift[F].delay(conn.async())
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] =
OperationNotSupported("Running in a single node").raiseError
def close: F[Unit] = FutureLift[F].liftCompletableFuture(RedisExecutor[F].delay(conn.closeAsync())).void
def close: F[Unit] = FutureLift[F].liftCompletableFuture(conn.closeAsync()).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
OperationNotSupported("Running in a single node").raiseError
def liftK[G[_]: Async]: RedisConnection[G, K, V] = {
implicit val ecG: RedisExecutor[G] = RedisExecutor[F].liftK[G]
def liftK[G[_]: Async]: RedisConnection[G, K, V] =
new RedisStatefulConnection[G, K, V](conn)
}
}

private[redis4cats] class RedisStatefulClusterConnection[F[_]: FutureLift: MonadThrow: RedisExecutor, K, V](
private[redis4cats] class RedisStatefulClusterConnection[F[_]: FutureLift: MonadThrow, K, V](
conn: StatefulRedisClusterConnection[K, V]
) extends RedisConnection[F, K, V] {
def sync: F[RedisSyncCommands[K, V]] =
OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.").raiseError
def async: F[RedisAsyncCommands[K, V]] =
OperationNotSupported("Transactions are not supported in a cluster. You must select a single node.").raiseError
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = RedisExecutor[F].delay(conn.async())
def clusterSync: F[RedisClusterSyncCommands[K, V]] = RedisExecutor[F].delay(conn.sync())
def close: F[Unit] =
FutureLift[F].liftCompletableFuture(RedisExecutor[F].delay(conn.closeAsync())).void
def clusterAsync: F[RedisClusterAsyncCommands[K, V]] = FutureLift[F].delay(conn.async())
def clusterSync: F[RedisClusterSyncCommands[K, V]] = FutureLift[F].delay(conn.sync())
def close: F[Unit] = FutureLift[F].liftCompletableFuture(conn.closeAsync()).void
def byNode(nodeId: NodeId): F[RedisAsyncCommands[K, V]] =
FutureLift[F].liftCompletableFuture(RedisExecutor[F].delay(conn.getConnectionAsync(nodeId.value))).flatMap {
stateful => RedisExecutor[F].delay(stateful.async())
FutureLift[F].liftCompletableFuture(conn.getConnectionAsync(nodeId.value)).flatMap { stateful =>
FutureLift[F].delay(stateful.async())
}
def liftK[G[_]: Async]: RedisConnection[G, K, V] = {
implicit val ecG: RedisExecutor[G] = RedisExecutor[F].liftK[G]
def liftK[G[_]: Async]: RedisConnection[G, K, V] =
new RedisStatefulClusterConnection[G, K, V](conn)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import cats.syntax.all._
import dev.profunktor.redis4cats.JavaConversions._
import dev.profunktor.redis4cats.config.Redis4CatsConfig
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.effect.{ FutureLift, Log, RedisExecutor }
import dev.profunktor.redis4cats.effect.{ FutureLift, Log }
import io.lettuce.core.masterreplica.{ MasterReplica, StatefulRedisMasterReplicaConnection }
import io.lettuce.core.{ ClientOptions, ReadFrom => JReadFrom }

Expand All @@ -32,7 +32,7 @@ sealed abstract case class RedisMasterReplica[K, V] private (underlying: Statefu

object RedisMasterReplica {

private[redis4cats] def acquireAndRelease[F[_]: FutureLift: Log: RedisExecutor: Sync, K, V](
private[redis4cats] def acquireAndRelease[F[_]: FutureLift: Log: Sync, K, V](
client: RedisClient,
codec: RedisCodec[K, V],
readFrom: Option[JReadFrom],
Expand All @@ -44,9 +44,7 @@ object RedisMasterReplica {
val connection: F[RedisMasterReplica[K, V]] =
FutureLift[F]
.liftCompletableFuture[StatefulRedisMasterReplicaConnection[K, V]](
Sync[F].delay {
MasterReplica.connectAsync[K, V](client.underlying, codec.underlying, uris.map(_.underlying).asJava)
}
MasterReplica.connectAsync[K, V](client.underlying, codec.underlying, uris.map(_.underlying).asJava)
)
.map(new RedisMasterReplica(_) {})

Expand All @@ -55,7 +53,7 @@ object RedisMasterReplica {

val release: RedisMasterReplica[K, V] => F[Unit] = connection =>
Log[F].info(s"Releasing Redis Master/Replica connection: ${connection.underlying}") *>
FutureLift[F].liftCompletableFuture(Sync[F].delay(connection.underlying.closeAsync())).void
FutureLift[F].liftCompletableFuture(connection.underlying.closeAsync()).void

(acquire, release)
}
Expand Down Expand Up @@ -108,14 +106,12 @@ object RedisMasterReplica {
config: Redis4CatsConfig,
uris: RedisURI*
)(readFrom: Option[JReadFrom] = None): Resource[F, RedisMasterReplica[K, V]] =
RedisExecutor.make[F].flatMap { implicit redisExecutor =>
Resource.eval(RedisClient.acquireAndReleaseWithoutUri[F](opts, config)).flatMap {
case (acquireClient, releaseClient) =>
Resource.make(acquireClient)(releaseClient).flatMap { client =>
val (acquire, release) = acquireAndRelease(client, codec, readFrom, uris: _*)
Resource.make(acquire)(release)
}
}
Resource.eval(RedisClient.acquireAndReleaseWithoutUri[F](opts, config)).flatMap {
case (acquireClient, releaseClient) =>
Resource.make(acquireClient)(releaseClient).flatMap { client =>
val (acquire, release) = acquireAndRelease(client, codec, readFrom, uris: _*)
Resource.make(acquire)(release)
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import io.lettuce.core.{ ConnectionFuture, RedisFuture }
import java.util.concurrent._

private[redis4cats] trait FutureLift[F[_]] {
def lift[A](fa: F[RedisFuture[A]])(implicit F: RedisExecutor[F]): F[A]
def liftConnectionFuture[A](fa: F[ConnectionFuture[A]])(implicit F: RedisExecutor[F]): F[A]
def liftCompletableFuture[A](fa: F[CompletableFuture[A]])(implicit F: RedisExecutor[F]): F[A]
def delay[A](thunk: => A): F[A]
def lift[A](fa: => RedisFuture[A]): F[A]
def liftConnectionFuture[A](fa: => ConnectionFuture[A]): F[A]
def liftCompletableFuture[A](fa: => CompletableFuture[A]): F[A]
}

object FutureLift {
Expand All @@ -36,40 +37,40 @@ object FutureLift {

implicit def forAsync[F[_]: Async]: FutureLift[F] =
new FutureLift[F] {
def lift[A](fa: F[RedisFuture[A]])(implicit F: RedisExecutor[F]): F[A] =
val F = Async[F]

def delay[A](thunk: => A): F[A] = F.delay(thunk)

def lift[A](fa: => RedisFuture[A]): F[A] =
liftJFuture[RedisFuture[A], A](fa)

def liftConnectionFuture[A](fa: F[ConnectionFuture[A]])(implicit F: RedisExecutor[F]): F[A] =
def liftConnectionFuture[A](fa: => ConnectionFuture[A]): F[A] =
liftJFuture[ConnectionFuture[A], A](fa)

def liftCompletableFuture[A](fa: F[CompletableFuture[A]])(implicit F: RedisExecutor[F]): F[A] =
def liftCompletableFuture[A](fa: => CompletableFuture[A]): F[A] =
liftJFuture[CompletableFuture[A], A](fa)

private[redis4cats] def liftJFuture[G <: JFuture[A], A](fa: F[G])(implicit F: RedisExecutor[F]): F[A] =
F.eval {
fa.flatMap[A] { f =>
Async[F].async { cb =>
F.delay {
f.handle[Unit] { (res: A, err: Throwable) =>
err match {
case null =>
cb(Right(res))
case _: CancellationException =>
()
case ex: CompletionException if ex.getCause ne null =>
cb(Left(ex.getCause))
case ex =>
cb(Left(ex))
}
}
private[redis4cats] def liftJFuture[G <: JFuture[A], A](f: => G): F[A] =
F.async { cb =>
F.delay {
f.handle[Unit] { (res: A, err: Throwable) =>
err match {
case null =>
cb(Right(res))
case _: CancellationException =>
()
case ex: CompletionException if ex.getCause ne null =>
cb(Left(ex.getCause))
case ex =>
cb(Left(ex))
}
.as(Some(F.delay(f.cancel(true)).void))
}
}
}
.as(Some(F.delay(f.cancel(true)).void))
}
}

implicit final class FutureLiftOps[F[_]: ApplicativeThrow: FutureLift: Log: RedisExecutor, A](fa: F[RedisFuture[A]]) {
implicit final class FutureLiftOps[F[_]: ApplicativeThrow: FutureLift: Log, A](fa: => RedisFuture[A]) {
def futureLift: F[A] =
FutureLift[F].lift(fa).onError {
case e: ExecutionException => Log[F].error(s"${e.getMessage()} - ${Option(e.getCause())}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.annotation.implicitNotFound

/**
* MkRedis is a capability trait that abstracts over the creation of RedisClient,
* RedisClusterClient, RedisExecutor, among other things.
* RedisClusterClient, among other things.
*
* It serves the internal purpose to orchastrate creation of such instances while
* avoiding impure constraints such as `Async` or `Sync`.
Expand All @@ -46,8 +46,6 @@ sealed trait MkRedis[F[_]] {

def clusterClient(uri: RedisURI*): Resource[F, RedisClusterClient]

private[redis4cats] def newExecutor: Resource[F, RedisExecutor[F]]
private[redis4cats] def newExecutor(threadPoolSize: Int): Resource[F, RedisExecutor[F]]
private[redis4cats] def futureLift: FutureLift[F]
private[redis4cats] def log: Log[F]
}
Expand Down Expand Up @@ -76,12 +74,6 @@ object MkRedis {
def clusterClient(uri: RedisURI*): Resource[F, RedisClusterClient] =
RedisClusterClient[F](uri: _*)

private[redis4cats] def newExecutor: Resource[F, RedisExecutor[F]] =
newExecutor(1)

private[redis4cats] def newExecutor(threadPoolSize: Int): Resource[F, RedisExecutor[F]] =
RedisExecutor.make[F](threadPoolSize)

private[redis4cats] def futureLift: FutureLift[F] = implicitly

private[redis4cats] def log: Log[F] = implicitly
Expand Down
Loading

0 comments on commit 4f5ae56

Please sign in to comment.