From 754a29454c8db30441d4969e9dd4121f39b270b6 Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Wed, 20 Apr 2022 18:02:53 +0200 Subject: [PATCH] pipelinines: reliable new implementation --- .../dev/profunktor/redis4cats/hlist.scala | 180 ------- .../profunktor/redis4cats/HListSuite.scala | 75 --- .../dev/profunktor/redis4cats/pipeline.scala | 64 --- .../dev/profunktor/redis4cats/runner.scala | 114 ----- .../profunktor/redis4cats/tx/RedisPipe.scala | 56 +++ .../profunktor/redis4cats/tx/RedisTx.scala | 52 +- .../profunktor/redis4cats/tx/TxRunner.scala | 50 ++ .../profunktor/redis4cats/tx/TxStore.scala | 38 ++ .../profunktor/redis4cats/tx/package.scala | 1 + .../redis4cats/RedisPipelineDemo.scala | 48 +- .../profunktor/redis4cats/RedisTxDemo.scala | 8 +- .../dev/profunktor/redis4cats/RedisSpec.scala | 5 +- .../profunktor/redis4cats/TestScenarios.scala | 458 +++++++++--------- site/docs/pipelining.md | 65 +-- site/docs/transactions.md | 4 +- 15 files changed, 433 insertions(+), 785 deletions(-) delete mode 100644 modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala delete mode 100644 modules/core/src/test/scala/dev/profunktor/redis4cats/HListSuite.scala delete mode 100644 modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala delete mode 100644 modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala create mode 100644 modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/RedisPipe.scala create mode 100644 modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala create mode 100644 modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/TxStore.scala diff --git a/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala b/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala deleted file mode 100644 index 90737aa7..00000000 --- a/modules/core/src/main/scala/dev/profunktor/redis4cats/hlist.scala +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright 2018-2021 ProfunKtor - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.profunktor.redis4cats - -import scala.annotation.nowarn - -/** - * An heterogeneous list, mainly used to operate on transactions. - * - * Highly inspired by Shapeless machinery but very much lightweight. - */ -object hlist extends TypeInequalityCompat { - - type ::[H, T <: HList] = HCons[H, T] - type HNil = HNil.type - - sealed trait HList { - def ::[A](a: A): HCons[A, this.type] = HCons(a, this) - - def reverse: HList = { - def go(ys: HList, res: HList): HList = - ys match { - case HNil => res - case HCons(h, t) => go(t, h :: res) - } - go(this, HNil) - } - - def size: Int = { - def go(ys: HList, acc: Int): Int = - ys match { - case HNil => acc - case HCons(_, t) => go(t, acc + 1) - } - go(this, 0) - } - } - - final case class HCons[+H, +Tail <: HList](head: H, tail: Tail) extends HList - case object HNil extends HList - - object HList { - def fromList[A](list: List[A]): HList = - list match { - case Nil => HNil - case (h :: t) => HCons(h, fromList(t)) - } - - implicit class HListOps[T <: HList](t: T) { - def filterUnit(implicit w: Filter[T]): w.R = { - def go(ys: HList, res: HList): HList = - ys match { - case HNil => res - case HCons(h, t) if h.isInstanceOf[Unit] => go(t, res) - case HCons(h, t) => go(t, h :: res) - } - go(t, HNil).reverse.asInstanceOf[w.R] - } - } - } - - object ~: { - def unapply[H, T <: HList](l: H :: T): Some[(H, T)] = Some((l.head, l.tail)) - } - - /** - * It witnesses a relationship between two HLists. - * - * The existing instances model a relationship between an HList comformed - * of actions F[A] and results A. E.g.: - * - * {{{ - * val actions: IO[Unit] :: IO[String] :: HNil = IO.unit :: IO.pure("hi") :: HNil - * val results: actions.R = () :: "hi" :: HNil - * }}} - * - * A Witness[IO[Unit] :: IO[String] :: HNil] proves that its result type can - * only be Unit :: String :: HNil. - * - * A Witness is sealed to avoid the creation of invalid instances. - */ - sealed trait Witness[T <: HList] { - type R <: HList - } - - object Witness { - type Aux[T0 <: HList, R0 <: HList] = Witness[T0] { type R = R0 } - - implicit val hnil: Witness.Aux[HNil, HNil] = - new Witness[HNil] { type R = HNil } - - implicit def hcons[F[_], A, T <: HList](implicit w: Witness[T]): Witness.Aux[HCons[F[A], T], HCons[A, w.R]] = - new Witness[HCons[F[A], T]] { type R = HCons[A, w.R] } - } - - /* - * It represents a relationship between a raw list and a - * filtered one. Mainly used to filter out values of type Unit. - */ - sealed trait Filter[T <: HList] { - type R <: HList - } - - object Filter { - type Aux[T0 <: HList, R0 <: HList] = Filter[T0] { type R = R0 } - - implicit val hnil: Filter.Aux[HNil, HNil] = - new Filter[HNil] { type R = HNil } - - implicit def hconsUnit[T <: HList](implicit w: Filter[T]): Filter.Aux[HCons[Unit, T], w.R] = - new Filter[HCons[Unit, T]] { type R = w.R } - - @nowarn - implicit def hconsNotUnit[A: =!=[Unit, *], T <: HList](implicit w: Filter[T]): Filter.Aux[HCons[A, T], A :: w.R] = - new Filter[HCons[A, T]] { type R = A :: w.R } - } - - /* - * Wraps a Witness and a Filter, where the input type of the Filter - * is the output type of the Witness. Facilitates expressing a dependent - * typing relation between T and S. - */ - sealed trait WitnessFilter[T <: HList] { - type S <: HList - - implicit val witness: Witness[T] - implicit val filter: Filter.Aux[witness.R, S] - } - - object WitnessFilter { - type Aux[T <: HList, S0 <: HList] = WitnessFilter[T] { - type S = S0 - } - - implicit val hnil: WitnessFilter.Aux[HNil, HNil] = new WitnessFilter[HNil] { - type S = HNil - - val witness: Witness.Aux[HNil, HNil] = implicitly - val filter: Filter.Aux[HNil, HNil] = implicitly - } - - implicit def hconsUnit[F[_], T <: HList]( - implicit w: WitnessFilter[T] - ): WitnessFilter.Aux[HCons[F[Unit], T], w.S] = - new WitnessFilter[HCons[F[Unit], T]] { - type S = w.S - - import w.{ witness => witnessT, filter => filterT } - - val witness: Witness.Aux[HCons[F[Unit], T], HCons[Unit, w.witness.R]] = implicitly - val filter: Filter.Aux[HCons[Unit, w.witness.R], w.S] = implicitly - } - - implicit def hconsNotUnit[F[_], A: =!=[Unit, *], T <: HList]( - implicit w: WitnessFilter[T] - ): WitnessFilter.Aux[HCons[F[A], T], HCons[A, w.S]] = - new WitnessFilter[HCons[F[A], T]] { - type S = HCons[A, w.S] - - import w.{ witness => witnessT, filter => filterT } - - val witness: Witness.Aux[HCons[F[A], T], HCons[A, w.witness.R]] = implicitly - val filter: Filter.Aux[A :: w.witness.R, A :: w.S] = implicitly - } - } -} diff --git a/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSuite.scala b/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSuite.scala deleted file mode 100644 index 151f409c..00000000 --- a/modules/core/src/test/scala/dev/profunktor/redis4cats/HListSuite.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2018-2021 ProfunKtor - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.profunktor.redis4cats - -import cats.effect.IO -import hlist._ -import munit.FunSuite - -class HListSuite extends FunSuite { - - def proof[T <: HList, R <: HList](xs: T)(implicit w: Witness.Aux[T, R]): R = - xs.asInstanceOf[w.R] // can return anything, we only care about the types here - - test("HList and Witness") { - val actions = IO.unit :: IO.pure("hi") :: HNil - - proof(actions): Unit :: String :: HNil - - compileErrors("proof(actions): Unit :: Int :: HNil") - } - - test("Unapply HLists (deconstruct)") { - val hl = () :: "hi" :: 123 :: true :: 's' :: 55 :: HNil - - val () ~: s ~: n1 ~: b ~: c ~: n2 ~: HNil = hl - - assert(s == "hi") - assert(n1 == 123) - assert(b == true) - assert(c == 's') - assert(n2 == 55) - } - - test("Filter out values") { - val unit = () - val hl = unit :: "hi" :: 33 :: unit :: false :: 's' :: unit :: HNil - - val s ~: n ~: b ~: c ~: HNil = hl.filterUnit - - assert(s == "hi") - assert(n == 33) - assert(b == false) - assert(c == 's') - } - - test("Conversion from standard list") { - val lt = List("a", "b", "c") - val hl = "a" :: "b" :: "c" :: HNil - assert(hl == HList.fromList(lt)) - assertEquals(hl.size, lt.size) - - val el = List.empty[Int] - assert(HNil == HList.fromList(el)) - assertEquals(HNil.size, el.size) - - // Temporary hack to convert from dynamic list until we properly fix it - val cmd = HList.fromList(List(IO.unit, IO.pure("hi"))).asInstanceOf[IO[Unit] :: IO[String] :: HNil] - proof(cmd) - } - -} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala deleted file mode 100644 index 34684e5a..00000000 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/pipeline.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2018-2021 ProfunKtor - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.profunktor.redis4cats - -import scala.util.control.NoStackTrace - -import cats.Applicative -import cats.effect.kernel._ -import dev.profunktor.redis4cats.effect.Log -import dev.profunktor.redis4cats.hlist._ - -object pipeline { - - case object PipelineError extends NoStackTrace - - case class RedisPipeline[F[_]: Async: Log, K, V]( - cmd: RedisCommands[F, K, V] - ) { - - private val ops = - Runner.Ops( - name = "Pipeline", - mainCmd = cmd.disableAutoFlush, - onComplete = (_: Runner.CancelFibers[F]) => cmd.flushCommands, - onError = Applicative[F].unit, - afterCompletion = cmd.enableAutoFlush, - mkError = () => PipelineError - ) - - /** - * Same as @exec, except it filters out values of type Unit - * from its result. - */ - def filterExec[T <: HList](commands: T)(implicit w: WitnessFilter[T]): F[w.S] = - Runner[F].filterExec(ops)(commands) - - /*** - * Exclusively run Redis commands as part of a pipeline (autoflush: disabled). - * - * Once all the commands have been executed, @exec will "flush" them into Redis, - * and finally re-enable autoflush. - * - * @return `F[R]` or raises a @PipelineError in case of failure. - */ - def exec[T <: HList, R <: HList](commands: T)(implicit w: Witness.Aux[T, R]): F[R] = - Runner[F].exec(ops)(commands) - - } - -} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala deleted file mode 100644 index fe09e63e..00000000 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/runner.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2018-2021 ProfunKtor - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dev.profunktor.redis4cats - -import java.util.UUID - -import scala.concurrent.duration._ - -import cats.effect.kernel._ -import cats.effect.kernel.implicits._ -import cats.syntax.all._ -import dev.profunktor.redis4cats.effect.Log -import dev.profunktor.redis4cats.hlist._ -import cats.Applicative - -object Runner { - type CancelFibers[F[_]] = Throwable => F[Unit] - - case class Ops[F[_]]( - name: String, - mainCmd: F[Unit], - onComplete: CancelFibers[F] => F[Unit], - onError: F[Unit], - afterCompletion: F[Unit], - mkError: () => Throwable - ) - - def apply[F[_]: Async: Log]: RunnerPartiallyApplied[F] = - new RunnerPartiallyApplied[F] -} - -private[redis4cats] class RunnerPartiallyApplied[F[_]: Async: Log] { - - def filterExec[T <: HList](ops: Runner.Ops[F])(commands: T)(implicit w: WitnessFilter[T]): F[w.S] = { - import w._ - exec[T](ops)(commands).map(_.filterUnit) - } - - def exec[T <: HList](ops: Runner.Ops[F])(commands: T)(implicit w: Witness[T]): F[w.R] = - (Deferred[F, Either[Throwable, w.R]], Sync[F].delay(UUID.randomUUID)).tupled.flatMap { - case (promise, uuid) => - def cancelFibers[A](fibs: HList)(err: Throwable): F[Unit] = - joinOrCancel(fibs, HNil)(false) >> promise.complete(err.asLeft).ensure(promiseAlreadyCompleted)(identity).void - - def onErrorOrCancelation(fibs: HList): F[Unit] = - cancelFibers(fibs)(ops.mkError()).guarantee(ops.onError) - - (Deferred[F, Unit], Ref.of[F, Int](0)).tupled - .flatMap { - case (gate, counter) => - // wait for commands to be scheduled - val synchronizer: F[Unit] = - counter.modify { - case n if n === (commands.size - 1) => - n + 1 -> gate.complete(()).ensure(promiseAlreadyCompleted)(identity).void - case n => n + 1 -> Applicative[F].unit - }.flatten - - Log[F].debug(s"${ops.name} started - ID: $uuid") >> - (ops.mainCmd >> runner(synchronizer, commands, HNil)) - .bracketCase(_ => gate.get) { - case (fibs, Outcome.Succeeded(_)) => - for { - _ <- Log[F].debug(s"${ops.name} completed - ID: $uuid") - _ <- ops.onComplete(cancelFibers(fibs)) - r <- joinOrCancel(fibs, HNil)(true) - // Casting here is fine since we have a `Witness` that proves this true - _ <- promise.complete(r.asInstanceOf[w.R].asRight).ensure(promiseAlreadyCompleted)(identity) - } yield () - case (fibs, Outcome.Errored(e)) => - Log[F].error(s"${ops.name} failed: ${e.getMessage} - ID: $uuid") >> onErrorOrCancelation(fibs) - case (fibs, Outcome.Canceled()) => - Log[F].error(s"${ops.name} canceled - ID: $uuid") >> onErrorOrCancelation(fibs) - } - .guarantee(ops.afterCompletion) >> promise.get.rethrow.timeout(3.seconds) - } - } - - // Forks every command in order - private def runner[H <: HList, G <: HList](f: F[Unit], ys: H, res: G): F[HList] = - ys match { - case HNil => res.pure[F].widen - case HCons((h: F[_] @unchecked), t) => (h, f).parTupled.map(_._1).start.flatMap(fb => runner(f, t, fb :: res)) - case HCons(h, t) => Log[F].error(s"Unexpected result: ${h.toString}") >> runner(f, t, res) - } - - // Joins or cancel fibers correspondent to previous executed commands - private def joinOrCancel[H <: HList, G <: HList](ys: H, res: G)(isJoin: Boolean): F[HList] = - ys match { - case HNil => Applicative[F].pure(res) - case HCons((h: Fiber[F, Throwable, Any] @unchecked), t) if isJoin => - h.joinWithNever.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) - case HCons((h: Fiber[F, Throwable, Any] @unchecked), t) => - h.cancel.flatMap(x => joinOrCancel(t, x :: res)(isJoin)) - case HCons(h, t) => - Log[F].error(s"Unexpected result: ${h.toString}") >> joinOrCancel(t, res)(isJoin) - } - - private val promiseAlreadyCompleted = new AssertionError("Promise already completed") -} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/RedisPipe.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/RedisPipe.scala new file mode 100644 index 00000000..75722f2f --- /dev/null +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/RedisPipe.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2018-2021 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats.tx + +import cats.effect.kernel._ +import cats.effect.kernel.syntax.monadCancel._ +import cats.syntax.all._ + +import dev.profunktor.redis4cats.RedisCommands +import dev.profunktor.redis4cats.effect.TxExecutor + +trait RedisPipe[F[_]] { + def exec(fs: List[F[Unit]]): F[Unit] + def run[A](fs: TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] +} + +object RedisPipe { + + /** + * Note: a single instance of `RedisCommands` can only handle a pipeline at a time. + * + * If you wish to run concurrent pipelines, each of them needs to run a a dedicated + * `RedisCommands` instance. + */ + def make[F[_]: Async, K, V]( + redis: RedisCommands[F, K, V] + ): Resource[F, RedisPipe[F]] = + TxExecutor.make[F].map { txe => + new RedisPipe[F] { + def exec(fs: List[F[Unit]]): F[Unit] = + run((_: TxStore[F, String, String]) => fs).void + + def run[A](fs: TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] = + TxRunner.run[F, K, V, A]( + acquire = redis.disableAutoFlush, + release = redis.flushCommands.guarantee(redis.enableAutoFlush), + onError = ().pure[F], + t = txe + )(fs) + } + } +} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/RedisTx.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/RedisTx.scala index 8ca8395d..62fe2f39 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/RedisTx.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/RedisTx.scala @@ -17,7 +17,6 @@ package dev.profunktor.redis4cats.tx import cats.effect.kernel._ -import cats.effect.kernel.syntax.all._ import cats.syntax.all._ import dev.profunktor.redis4cats.RedisCommands @@ -25,29 +24,11 @@ import dev.profunktor.redis4cats.effect.TxExecutor trait RedisTx[F[_]] { def exec(fs: List[F[Unit]]): F[Unit] - def run[A](fs: RedisTx.Store[F, String, A] => List[F[Unit]]): F[Map[String, A]] + def run[A](fs: TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] } object RedisTx { - /** - * Provides a way to store transactional results for later retrieval. - */ - trait Store[F[_], K, V] { - def get: F[Map[K, V]] - def set(key: K)(v: V): F[Unit] - } - - object Store { - private[redis4cats] def make[F[_]: Async, K, V]: F[Store[F, K, V]] = - Ref.of[F, Map[K, V]](Map.empty).map { ref => - new Store[F, K, V] { - def get: F[Map[K, V]] = ref.get - def set(key: K)(v: V): F[Unit] = ref.update(_.updated(key, v)) - } - } - } - /** * Note: a single instance of `RedisCommands` can only handle a transaction at a time. * @@ -57,27 +38,18 @@ object RedisTx { def make[F[_]: Async, K, V]( redis: RedisCommands[F, K, V] ): Resource[F, RedisTx[F]] = - TxExecutor.make[F].map { t => + TxExecutor.make[F].map { txe => new RedisTx[F] { - def exec(fs: List[F[Unit]]): F[Unit] = run((_: Store[F, String, String]) => fs).void - - def run[A](fs: Store[F, String, A] => List[F[Unit]]): F[Map[String, A]] = - Store.make[F, String, A].flatMap { store => - (Deferred[F, Unit], Ref.of[F, List[Fiber[F, Throwable, Unit]]](List.empty)).tupled.flatMap { - case (gate, fbs) => - t.eval(redis.multi) - .bracketCase { _ => - fs(store) - .traverse_(f => t.start(f).flatMap(fb => fbs.update(_ :+ fb))) - .guarantee(gate.complete(()).void) - } { - case (_, Outcome.Succeeded(_)) => - gate.get *> t.eval(redis.exec).guarantee(fbs.get.flatMap(_.traverse_(_.join))) - case (_, _) => - t.eval(redis.discard).guarantee(fbs.get.flatMap(_.traverse_(_.cancel))) - } - } *> store.get - } + def exec(fs: List[F[Unit]]): F[Unit] = + run((_: TxStore[F, String, String]) => fs).void + + def run[A](fs: TxStore[F, String, A] => List[F[Unit]]): F[Map[String, A]] = + TxRunner.run[F, K, V, A]( + acquire = redis.multi, + release = redis.exec, + onError = redis.discard, + t = txe + )(fs) } } } diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala new file mode 100644 index 00000000..29567dc0 --- /dev/null +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2018-2021 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats.tx + +import cats.effect.kernel._ +import cats.effect.kernel.syntax.all._ +import cats.syntax.all._ + +import dev.profunktor.redis4cats.effect.TxExecutor + +private[redis4cats] object TxRunner { + private[redis4cats] def run[F[_]: Async, K, V, A]( + acquire: F[Unit], + release: F[Unit], + onError: F[Unit], + t: TxExecutor[F] + )( + fs: TxStore[F, String, A] => List[F[Unit]] + ): F[Map[String, A]] = + TxStore.make[F, String, A].flatMap { store => + (Deferred[F, Unit], Ref.of[F, List[Fiber[F, Throwable, Unit]]](List.empty)).tupled.flatMap { + case (gate, fbs) => + t.eval(acquire) + .bracketCase { _ => + fs(store) + .traverse_(f => t.start(f).flatMap(fb => fbs.update(_ :+ fb))) + .guarantee(gate.complete(()).void) + } { + case (_, Outcome.Succeeded(_)) => + gate.get *> t.eval(release).guarantee(fbs.get.flatMap(_.traverse_(_.join))) + case (_, _) => + t.eval(onError).guarantee(fbs.get.flatMap(_.traverse_(_.cancel))) + } + } *> store.get + } +} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/TxStore.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/TxStore.scala new file mode 100644 index 00000000..be2362d1 --- /dev/null +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/TxStore.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2018-2021 ProfunKtor + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.profunktor.redis4cats.tx + +import cats.effect.kernel.{ Async, Ref } +import cats.syntax.functor._ + +/** + * Provides a way to store transactional results for later retrieval. + */ +trait TxStore[F[_], K, V] { + def get: F[Map[K, V]] + def set(key: K)(v: V): F[Unit] +} + +object TxStore { + private[redis4cats] def make[F[_]: Async, K, V]: F[TxStore[F, K, V]] = + Ref.of[F, Map[K, V]](Map.empty).map { ref => + new TxStore[F, K, V] { + def get: F[Map[K, V]] = ref.get + def set(key: K)(v: V): F[Unit] = ref.update(_.updated(key, v)) + } + } +} diff --git a/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/package.scala b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/package.scala index df832368..231c393f 100644 --- a/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/package.scala +++ b/modules/effects/src/main/scala/dev/profunktor/redis4cats/tx/package.scala @@ -19,5 +19,6 @@ package dev.profunktor.redis4cats import scala.util.control.NoStackTrace package object tx { + case object PipelineError extends NoStackTrace case object TransactionDiscarded extends NoStackTrace } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala index 56f9f5a0..4bb6b5c0 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisPipelineDemo.scala @@ -17,10 +17,9 @@ package dev.profunktor.redis4cats import cats.effect._ +import cats.syntax.all._ import dev.profunktor.redis4cats.effect.Log.NoOp._ -import dev.profunktor.redis4cats.hlist._ -import dev.profunktor.redis4cats.pipeline._ -import java.util.concurrent.TimeoutException +import dev.profunktor.redis4cats.tx._ object RedisPipelineDemo extends LoggerIOApp { @@ -36,34 +35,33 @@ object RedisPipelineDemo extends LoggerIOApp { val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = Redis[IO].utf8(redisURI) - commandsApi - .use { cmd => - val getters = - cmd.get(key1).flatTap(showResult(key1)) *> - cmd.get(key2).flatTap(showResult(key2)) + commandsApi.use { redis => + val getters = + redis.get(key1).flatTap(showResult(key1)) *> + redis.get(key2).flatTap(showResult(key2)) - val operations = - cmd.set(key1, "noop") :: cmd.set(key2, "windows") :: cmd.get(key1) :: - cmd.set(key1, "nix") :: cmd.set(key2, "linux") :: cmd.get(key1) :: HNil + val operations = (store: TxStore[IO, String, Option[String]]) => + List( + redis.set(key1, "noop"), + redis.set(key2, "windows"), + redis.get(key1).flatMap(store.set(s"$key1-v1")), + redis.set(key1, "nix"), + redis.set(key2, "linux"), + redis.get(key1).flatMap(store.set(s"$key1-v2")) + ) - val prog = - RedisPipeline(cmd) - .filterExec(operations) - .flatMap { - case res1 ~: res2 ~: HNil => - putStrLn(s"res1: $res1, res2: $res2") - } - .onError { - case PipelineError => - putStrLn("[Error] - Pipeline failed") - case _: TimeoutException => - putStrLn("[Error] - Timeout") + val prog = + RedisPipe.make(redis).use { + _.run(operations) + .flatMap(kv => IO.println(s"KV: $kv")) + .recoverWith { case e => putStrLn(s"[Error] - ${e.getMessage}") } + } - getters >> prog >> getters >> putStrLn("keep doing stuff...") - } + getters >> prog >> getters >> putStrLn("keep doing stuff...") + } } } diff --git a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTxDemo.scala b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTxDemo.scala index 653f643e..800fb50e 100644 --- a/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTxDemo.scala +++ b/modules/examples/src/main/scala/dev/profunktor/redis4cats/RedisTxDemo.scala @@ -20,7 +20,7 @@ import cats.effect._ import dev.profunktor.redis4cats.connection.RedisClient import dev.profunktor.redis4cats.data.RedisCodec import dev.profunktor.redis4cats.log4cats._ -import dev.profunktor.redis4cats.tx.{ RedisTx, TransactionDiscarded } +import dev.profunktor.redis4cats.tx._ object RedisTxDemo extends LoggerIOApp { @@ -42,7 +42,7 @@ object RedisTxDemo extends LoggerIOApp { def prog[A]( tx: RedisTx[IO], - ops: RedisTx.Store[IO, String, A] => List[IO[Unit]] + ops: TxStore[IO, String, A] => List[IO[Unit]] ): IO[Unit] = tx.run(ops) // or tx.exec(ops) to discard the result .flatMap(kv => IO.println(s"KV: $kv")) @@ -63,7 +63,7 @@ object RedisTxDemo extends LoggerIOApp { // it is not possible to mix different stores. In case of needing to preserve values // of other types, you'd need to use a local Ref or so. - val ops = (store: RedisTx.Store[IO, String, Option[String]]) => + val ops = (store: TxStore[IO, String, Option[String]]) => List( redis.set(key1, "sad"), redis.set(key2, "windows"), @@ -79,7 +79,7 @@ object RedisTxDemo extends LoggerIOApp { val p2 = mkRedis(cli).use { redis => RedisTx.make(redis).use { tx => - val ops = (store: RedisTx.Store[IO, String, Long]) => + val ops = (store: TxStore[IO, String, Long]) => List( redis.set("yo", "wat"), redis.incr(key3).flatMap(store.set(s"$key3-v1")), diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala index cb1c09dd..73ac950e 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisSpec.scala @@ -40,15 +40,12 @@ class RedisSpec extends Redis4CatsFunSuite(false) with TestScenarios { test("connection api")(withRedis(connectionScenario)) - // TODO: re-enable once the implementation is fixed as RedisTx - //test("pipelining")(withRedis(pipelineScenario)) + test("pipelining")(withRedis(pipelineScenario)) test("server")(withRedis(serverScenario)) test("transactions: successful")(withRedis(transactionScenario)) - //test("transactions: canceled")(withRedis(canceledTransactionScenario)) - test("scripts")(withRedis(scriptsScenario)) test("hyperloglog api")(withRedis(hyperloglogScenario)) diff --git a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala index 3ae59045..4db7b0ba 100644 --- a/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala +++ b/modules/tests/src/test/scala/dev/profunktor/redis4cats/TestScenarios.scala @@ -17,18 +17,16 @@ package dev.profunktor.redis4cats import java.time.Instant -import java.util.concurrent.TimeoutException + import cats.data.NonEmptyList import cats.effect._ import cats.implicits._ + import dev.profunktor.redis4cats.algebra.BitCommandOperation.{ IncrUnsignedBy, SetUnsigned } import dev.profunktor.redis4cats.algebra.BitCommands import dev.profunktor.redis4cats.data.KeyScanCursor -import dev.profunktor.redis4cats.effect.Log.NoOp._ import dev.profunktor.redis4cats.effects._ -import dev.profunktor.redis4cats.hlist._ -import dev.profunktor.redis4cats.pipeline.{ PipelineError, RedisPipeline } -import dev.profunktor.redis4cats.tx.RedisTx +import dev.profunktor.redis4cats.tx._ import io.lettuce.core.{ GeoArgs, ZAggregateArgs } import munit.FunSuite @@ -36,7 +34,7 @@ import scala.concurrent.duration._ trait TestScenarios { self: FunSuite => - def locationScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def locationScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val _BuenosAires = GeoLocation(Longitude(-58.3816), Latitude(-34.6037), "Buenos Aires") val _RioDeJaneiro = GeoLocation(Longitude(-43.1729), Latitude(-22.9068), "Rio de Janeiro") val _Montevideo = GeoLocation(Longitude(-56.164532), Latitude(-34.901112), "Montevideo") @@ -44,120 +42,120 @@ trait TestScenarios { self: FunSuite => val testKey = "location" for { - _ <- cmd.geoAdd(testKey, _BuenosAires) - _ <- cmd.geoAdd(testKey, _RioDeJaneiro) - _ <- cmd.geoAdd(testKey, _Montevideo) - _ <- cmd.geoAdd(testKey, _Tokyo) - x <- cmd.geoDist(testKey, _BuenosAires.value, _Tokyo.value, GeoArgs.Unit.km) + _ <- redis.geoAdd(testKey, _BuenosAires) + _ <- redis.geoAdd(testKey, _RioDeJaneiro) + _ <- redis.geoAdd(testKey, _Montevideo) + _ <- redis.geoAdd(testKey, _Tokyo) + x <- redis.geoDist(testKey, _BuenosAires.value, _Tokyo.value, GeoArgs.Unit.km) _ <- IO(assertEquals(x, 18374.9052)) - y <- cmd.geoPos(testKey, _RioDeJaneiro.value) + y <- redis.geoPos(testKey, _RioDeJaneiro.value) _ <- IO(assert(y.contains(GeoCoordinate(-43.17289799451828, -22.906801071586663)))) - z <- cmd.geoRadius(testKey, GeoRadius(_Montevideo.lon, _Montevideo.lat, Distance(10000.0)), GeoArgs.Unit.km) + z <- redis.geoRadius(testKey, GeoRadius(_Montevideo.lon, _Montevideo.lat, Distance(10000.0)), GeoArgs.Unit.km) _ <- IO(assert(z.toList.containsSlice(List(_BuenosAires.value, _Montevideo.value, _RioDeJaneiro.value)))) } yield () } - def hashesScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def hashesScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val testKey = "foo" val testField = "bar" val testField2 = "baz" for { - x <- cmd.hGet(testKey, testField) + x <- redis.hGet(testKey, testField) _ <- IO(assert(x.isEmpty)) - isSet1 <- cmd.hSetNx(testKey, testField, "some value") + isSet1 <- redis.hSetNx(testKey, testField, "some value") _ <- IO(assert(isSet1)) - y <- cmd.hGet(testKey, testField) + y <- redis.hGet(testKey, testField) _ <- IO(assert(y.contains("some value"))) - isSet2 <- cmd.hSetNx(testKey, testField, "should not happen") + isSet2 <- redis.hSetNx(testKey, testField, "should not happen") _ <- IO(assert(!isSet2)) - w <- cmd.hGet(testKey, testField) + w <- redis.hGet(testKey, testField) _ <- IO(assert(w.contains("some value"))) - w <- cmd.hmGet(testKey, testField, testField2) + w <- redis.hmGet(testKey, testField, testField2) _ <- IO(assertEquals(w, Map(testField -> "some value"))) - d <- cmd.hDel(testKey, testField) + d <- redis.hDel(testKey, testField) _ <- IO(assertEquals(d, 1L)) - z <- cmd.hGet(testKey, testField) + z <- redis.hGet(testKey, testField) _ <- IO(assert(z.isEmpty)) - _ <- cmd.hSet(testKey, Map(testField -> "some value", testField2 -> "another value")) - v <- cmd.hGet(testKey, testField) + _ <- redis.hSet(testKey, Map(testField -> "some value", testField2 -> "another value")) + v <- redis.hGet(testKey, testField) _ <- IO(assert(v.contains("some value"))) - v <- cmd.hGet(testKey, testField2) + v <- redis.hGet(testKey, testField2) _ <- IO(assert(v.contains("another value"))) } yield () } - def listsScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def listsScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val testKey = "listos" for { - first1 <- cmd.blPop(1.second, NonEmptyList.one(testKey)) + first1 <- redis.blPop(1.second, NonEmptyList.one(testKey)) _ <- IO(assert(first1.isEmpty)) - last1 <- cmd.brPop(1.second, NonEmptyList.one(testKey)) + last1 <- redis.brPop(1.second, NonEmptyList.one(testKey)) _ <- IO(assert(last1.isEmpty)) - pLength1 <- cmd.rPush(testKey, "one", "two") + pLength1 <- redis.rPush(testKey, "one", "two") _ <- IO(assert(pLength1 === 2)) - last2 <- cmd.brPop(1.second, NonEmptyList.one(testKey)) + last2 <- redis.brPop(1.second, NonEmptyList.one(testKey)) _ <- IO(assert(last2.contains((testKey, "two")))) - first2 <- cmd.blPop(1.second, NonEmptyList.one(testKey)) + first2 <- redis.blPop(1.second, NonEmptyList.one(testKey)) _ <- IO(assert(first2.contains((testKey, "one")))) - t <- cmd.lRange(testKey, 0, 10) + t <- redis.lRange(testKey, 0, 10) _ <- IO(assert(t.isEmpty)) - pLength2 <- cmd.rPush(testKey, "one", "two", "three") + pLength2 <- redis.rPush(testKey, "one", "two", "three") _ <- IO(assert(pLength2 === 3)) - x <- cmd.lRange(testKey, 0, 10) + x <- redis.lRange(testKey, 0, 10) _ <- IO(assertEquals(x, List("one", "two", "three"))) - y <- cmd.lLen(testKey) + y <- redis.lLen(testKey) _ <- IO(assert(y.contains(3))) - a <- cmd.lPop(testKey) + a <- redis.lPop(testKey) _ <- IO(assert(a.contains("one"))) - b <- cmd.rPop(testKey) + b <- redis.rPop(testKey) _ <- IO(assert(b.contains("three"))) - z <- cmd.lRange(testKey, 0, 10) + z <- redis.lRange(testKey, 0, 10) _ <- IO(assertEquals(z, List("two"))) - c <- cmd.lInsertAfter(testKey, "two", "three") + c <- redis.lInsertAfter(testKey, "two", "three") _ <- IO(assertEquals(c, 2L)) - d <- cmd.lInsertBefore(testKey, "n/a", "one") + d <- redis.lInsertBefore(testKey, "n/a", "one") _ <- IO(assertEquals(d, -1L)) - e <- cmd.lInsertBefore(testKey, "two", "one") + e <- redis.lInsertBefore(testKey, "two", "one") _ <- IO(assertEquals(e, 3L)) - f <- cmd.lRange(testKey, 0, 10) + f <- redis.lRange(testKey, 0, 10) _ <- IO(assertEquals(f, List("one", "two", "three"))) - g <- cmd.lRem(testKey, 0, "one") + g <- redis.lRem(testKey, 0, "one") _ <- IO(assertEquals(g, 1L)) - _ <- cmd.lSet(testKey, 1, "four") - _ <- cmd.lTrim(testKey, 1, 2) - h <- cmd.lRange(testKey, 0, 10) + _ <- redis.lSet(testKey, 1, "four") + _ <- redis.lTrim(testKey, 1, 2) + h <- redis.lRange(testKey, 0, 10) _ <- IO(assertEquals(h, List("four"))) } yield () } - def setsScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def setsScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val testKey = "foos" for { - x <- cmd.sMembers(testKey) + x <- redis.sMembers(testKey) _ <- IO(assert(x.isEmpty)) - a <- cmd.sAdd(testKey, "set value") + a <- redis.sAdd(testKey, "set value") _ <- IO(assertEquals(a, 1L)) - b <- cmd.sAdd(testKey, "set value") + b <- redis.sAdd(testKey, "set value") _ <- IO(assertEquals(b, 0L)) - y <- cmd.sMembers(testKey) + y <- redis.sMembers(testKey) _ <- IO(assert(y.contains("set value"))) - o <- cmd.sCard(testKey) + o <- redis.sCard(testKey) _ <- IO(assertEquals(o, 1L)) - _ <- cmd.sRem("non-existing", "random") - w <- cmd.sMembers(testKey) + _ <- redis.sRem("non-existing", "random") + w <- redis.sMembers(testKey) _ <- IO(assert(w.contains("set value"))) - _ <- cmd.sRem(testKey, "set value") - z <- cmd.sMembers(testKey) + _ <- redis.sRem(testKey, "set value") + z <- redis.sMembers(testKey) _ <- IO(assert(z.isEmpty)) - t <- cmd.sCard(testKey) + t <- redis.sCard(testKey) _ <- IO(assertEquals(t, 0L)) - _ <- cmd.sAdd(testKey, "value 1", "value 2") - r <- cmd.sMisMember(testKey, "value 1", "random", "value 2") + _ <- redis.sAdd(testKey, "value 1", "value 2") + r <- redis.sMisMember(testKey, "value 1", "random", "value 2") _ <- IO(assertEquals(r, List(true, false, true))) } yield () } - def sortedSetsScenario(cmd: RedisCommands[IO, String, Long]): IO[Unit] = { + def sortedSetsScenario(redis: RedisCommands[IO, String, Long]): IO[Unit] = { val testKey = "{same_hash_slot}:zztop" val otherTestKey = "{same_hash_slot}:sharp:dressed:man" val scoreWithValue1 = ScoreWithValue(Score(1), 1L) @@ -165,41 +163,41 @@ trait TestScenarios { self: FunSuite => val scoreWithValue3 = ScoreWithValue(Score(5), 3L) val timeout = 1.second for { - minPop1 <- cmd.zPopMin(testKey, 1) + minPop1 <- redis.zPopMin(testKey, 1) _ <- IO(assert(minPop1.isEmpty)) - maxPop1 <- cmd.zPopMax(testKey, 1) + maxPop1 <- redis.zPopMax(testKey, 1) _ <- IO(assert(maxPop1.isEmpty)) - minBPop1 <- cmd.bzPopMin(timeout, NonEmptyList.one(testKey)) + minBPop1 <- redis.bzPopMin(timeout, NonEmptyList.one(testKey)) _ <- IO(assert(minBPop1.isEmpty)) - maxBPop1 <- cmd.bzPopMax(timeout, NonEmptyList.one(testKey)) + maxBPop1 <- redis.bzPopMax(timeout, NonEmptyList.one(testKey)) _ <- IO(assert(maxBPop1.isEmpty)) - t <- cmd.zRevRangeByScore(testKey, ZRange(0, 2), limit = None) + t <- redis.zRevRangeByScore(testKey, ZRange(0, 2), limit = None) _ <- IO(assert(t.isEmpty)) - add2 <- cmd.zAdd(testKey, args = None, scoreWithValue1, scoreWithValue2) + add2 <- redis.zAdd(testKey, args = None, scoreWithValue1, scoreWithValue2) _ <- IO(assertEquals(add2, 2L)) - minPop2 <- cmd.zPopMin(testKey, 1) + minPop2 <- redis.zPopMin(testKey, 1) _ <- IO(assertEquals(minPop2, List(scoreWithValue1))) - maxPop2 <- cmd.zPopMax(testKey, 1) + maxPop2 <- redis.zPopMax(testKey, 1) _ <- IO(assertEquals(maxPop2, List(scoreWithValue2))) - _ <- cmd.zCard(testKey).map(card => assert(card.contains(0))) - _ <- cmd.zAdd(testKey, args = None, scoreWithValue1, scoreWithValue2) - minBPop2 <- cmd.bzPopMin(timeout, NonEmptyList.one(testKey)) + _ <- redis.zCard(testKey).map(card => assert(card.contains(0))) + _ <- redis.zAdd(testKey, args = None, scoreWithValue1, scoreWithValue2) + minBPop2 <- redis.bzPopMin(timeout, NonEmptyList.one(testKey)) _ <- IO(assert(minBPop2.contains((testKey, scoreWithValue1)))) - maxBPop2 <- cmd.bzPopMax(timeout, NonEmptyList.one(testKey)) + maxBPop2 <- redis.bzPopMax(timeout, NonEmptyList.one(testKey)) _ <- IO(assert(maxBPop2.contains((testKey, scoreWithValue2)))) - _ <- cmd.zCard(testKey).map(card => assert(card.contains(0))) - _ <- cmd.zAdd(testKey, args = None, scoreWithValue1, scoreWithValue2) - x <- cmd.zRevRangeByScore(testKey, ZRange(0, 2), limit = None) + _ <- redis.zCard(testKey).map(card => assert(card.contains(0))) + _ <- redis.zAdd(testKey, args = None, scoreWithValue1, scoreWithValue2) + x <- redis.zRevRangeByScore(testKey, ZRange(0, 2), limit = None) _ <- IO(assertEquals(x, List(1L))) - y <- cmd.zCard(testKey) + y <- redis.zCard(testKey) _ <- IO(assert(y.contains(2))) - z <- cmd.zCount(testKey, ZRange(0, 1)) + z <- redis.zCount(testKey, ZRange(0, 1)) _ <- IO(assert(z.contains(1))) - _ <- cmd.zAdd(otherTestKey, args = None, scoreWithValue1, scoreWithValue3) - zUnion <- cmd.zUnion(args = None, testKey, otherTestKey) + _ <- redis.zAdd(otherTestKey, args = None, scoreWithValue1, scoreWithValue3) + zUnion <- redis.zUnion(args = None, testKey, otherTestKey) _ <- IO(assertEquals(zUnion, List(1L, 2L, 3L))) aggregateArgs = ZAggregateArgs.Builder.sum().weights(10L, 20L) - zUnionWithScoreAndArgs <- cmd.zUnionWithScores(Some(aggregateArgs), testKey, otherTestKey) + zUnionWithScoreAndArgs <- redis.zUnionWithScores(Some(aggregateArgs), testKey, otherTestKey) _ <- IO( assertEquals( zUnionWithScoreAndArgs, @@ -207,92 +205,92 @@ trait TestScenarios { self: FunSuite => List(ScoreWithValue(Score(30), 1L), ScoreWithValue(Score(30), 2L), ScoreWithValue(Score(100), 3L)) ) ) - zInter <- cmd.zInter(args = None, testKey, otherTestKey) + zInter <- redis.zInter(args = None, testKey, otherTestKey) _ <- IO(assertEquals(zInter, List(1L))) - zDiff <- cmd.zDiff(testKey, otherTestKey) + zDiff <- redis.zDiff(testKey, otherTestKey) _ <- IO(assertEquals(zDiff, List(2L))) - r <- cmd.zRemRangeByScore(testKey, ZRange(1, 3)) + r <- redis.zRemRangeByScore(testKey, ZRange(1, 3)) _ <- IO(assertEquals(r, 2L)) } yield () } - def keysScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def keysScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val key1 = "key1" val key2 = "key2" for { - x <- cmd.get(key1) + x <- redis.get(key1) _ <- IO(assertEquals(x, None)) - exist1 <- cmd.exists(key1) + exist1 <- redis.exists(key1) _ <- IO(assert(!exist1)) - idletime1 <- cmd.objectIdletime(key1) + idletime1 <- redis.objectIdletime(key1) _ <- IO(assert(idletime1.isEmpty)) - _ <- cmd.set(key1, "some value") - exist2 <- cmd.exists(key1) + _ <- redis.set(key1, "some value") + exist2 <- redis.exists(key1) _ <- IO(assert(exist2)) - idletime2 <- cmd.objectIdletime(key1) + idletime2 <- redis.objectIdletime(key1) _ <- IO(assert(idletime2.isDefined)) - _ <- cmd.mSet(Map(key2 -> "some value 2")) - exist3 <- cmd.exists(key1, key2) + _ <- redis.mSet(Map(key2 -> "some value 2")) + exist3 <- redis.exists(key1, key2) _ <- IO(assert(exist3)) - exist4 <- cmd.exists(key1, key2, "_not_existing_key_") + exist4 <- redis.exists(key1, key2, "_not_existing_key_") _ <- IO(assert(!exist4)) - g <- cmd.del(key1) + g <- redis.del(key1) _ <- IO(assertEquals(g, 1L)) - exist5 <- cmd.exists(key1) + exist5 <- redis.exists(key1) _ <- IO(assert(!exist5)) - a <- cmd.ttl("whatever+") + a <- redis.ttl("whatever+") _ <- IO(assert(a.isEmpty)) - b <- cmd.pttl("whatever+") + b <- redis.pttl("whatever+") _ <- IO(assert(b.isEmpty)) - _ <- cmd.set("f1", "bar") - h <- cmd.expire("f1", 10.seconds) + _ <- redis.set("f1", "bar") + h <- redis.expire("f1", 10.seconds) _ <- IO(assertEquals(h, true)) - c <- cmd.ttl("f1") + c <- redis.ttl("f1") _ <- IO(assert(c.nonEmpty)) - d <- cmd.pttl("f1") + d <- redis.pttl("f1") _ <- IO(assert(d.nonEmpty)) - _ <- cmd.set("f2", "yay") - i <- cmd.expire("f2", 50.millis) + _ <- redis.set("f2", "yay") + i <- redis.expire("f2", 50.millis) _ <- IO(assertEquals(i, true)) - e <- cmd.ttl("f2") + e <- redis.ttl("f2") _ <- IO(assert(e.nonEmpty)) _ <- IO.sleep(50.millis) - f <- cmd.ttl("f2") + f <- redis.ttl("f2") _ <- IO(assert(f.isEmpty)) - j <- cmd.expire("_not_existing_key_", 50.millis) + j <- redis.expire("_not_existing_key_", 50.millis) _ <- IO(assertEquals(j, false)) - _ <- cmd.del("f1") + _ <- redis.del("f1") } yield () } - def scanScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def scanScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val keys = (1 until 10).map("key" + _).sorted.toList for { - _ <- cmd.mSet(keys.map(key => (key, key + "#value")).toMap) - scan0 <- cmd.scan + _ <- redis.mSet(keys.map(key => (key, key + "#value")).toMap) + scan0 <- redis.scan _ <- IO(assertEquals(scan0.cursor, "0")) _ <- IO(assertEquals(scan0.keys.sorted, keys)) - scan1 <- cmd.scan(ScanArgs(1)) + scan1 <- redis.scan(ScanArgs(1)) _ <- IO(assert(scan1.keys.nonEmpty, "read at least something but no hard requirement")) _ <- IO(assert(scan1.keys.size < keys.size, "but read less than all of them")) - scan2 <- cmd.scan(scan1, ScanArgs("key*")) + scan2 <- redis.scan(scan1, ScanArgs("key*")) _ <- IO(assertEquals(scan2.cursor, "0")) _ <- IO(assertEquals((scan1.keys ++ scan2.keys).sorted, keys, "read to the end in result")) } yield () } - def clusterScanScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def clusterScanScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val keys = (1 to 10).map("key" + _).sorted.toList for { - _ <- cmd.mSet(keys.map(key => (key, key + "#value")).toMap) - tp <- clusterScan(cmd, args = None) + _ <- redis.mSet(keys.map(key => (key, key + "#value")).toMap) + tp <- clusterScan(redis, args = None) (keys0, iterations0) = tp _ <- IO(assertEquals(keys0.sorted, keys)) - tp <- clusterScan(cmd, args = Some(ScanArgs("key*"))) + tp <- clusterScan(redis, args = Some(ScanArgs("key*"))) (keys1, iterations1) = tp _ <- IO(assertEquals(keys1.sorted, keys)) _ <- IO(assertEquals(iterations1, iterations0)) - tp <- clusterScan(cmd, args = Some(ScanArgs(1))) + tp <- clusterScan(redis, args = Some(ScanArgs(1))) (keys2, iterations2) = tp _ <- IO(assertEquals(keys2.sorted, keys)) _ <- IO(assert(iterations2 > iterations0, "made more iterations because of limit")) @@ -305,54 +303,54 @@ trait TestScenarios { self: FunSuite => * Does scan on all cluster nodes until all keys collected since order of scanned nodes can't be guaranteed */ private def clusterScan( - cmd: RedisCommands[IO, String, String], + redis: RedisCommands[IO, String, String], args: Option[ScanArgs] ): IO[(List[String], Iterations)] = { def scanRec(previous: KeyScanCursor[String], acc: List[String], cnt: Int): IO[(List[String], Iterations)] = if (previous.isFinished) IO.pure((previous.keys ++ acc, cnt)) else - args.fold(cmd.scan(previous))(cmd.scan(previous, _)).flatMap { + args.fold(redis.scan(previous))(redis.scan(previous, _)).flatMap { scanRec(_, previous.keys ++ acc, cnt + 1) } - args.fold(cmd.scan)(cmd.scan).flatMap(scanRec(_, List.empty, 0)) + args.fold(redis.scan)(redis.scan).flatMap(scanRec(_, List.empty, 0)) } - def bitmapsScenario(cmd: BitCommands[IO, String, String]): IO[Unit] = { + def bitmapsScenario(redis: BitCommands[IO, String, String]): IO[Unit] = { val key = "foo" val secondKey = "bar" val thirdKey = "baz" for { - _ <- cmd.setBit(key, 0, 1) - oneBit <- cmd.getBit(key, 0) + _ <- redis.setBit(key, 0, 1) + oneBit <- redis.getBit(key, 0) _ <- IO(assertEquals(oneBit, Some(1.toLong))) - _ <- cmd.setBit(key, 1, 1) - bitLen <- cmd.bitCount(key) + _ <- redis.setBit(key, 1, 1) + bitLen <- redis.bitCount(key) _ <- IO(assertEquals(bitLen, 2.toLong)) - bitLen2 <- cmd.bitCount(key, 0, 2) + bitLen2 <- redis.bitCount(key, 0, 2) _ <- IO(assertEquals(bitLen2, 2.toLong)) - _ <- cmd.setBit(key, 0, 1) - _ <- cmd.setBit(secondKey, 0, 1) - _ <- cmd.bitOpAnd(thirdKey, key, secondKey) - r <- cmd.getBit(thirdKey, 0) + _ <- redis.setBit(key, 0, 1) + _ <- redis.setBit(secondKey, 0, 1) + _ <- redis.bitOpAnd(thirdKey, key, secondKey) + r <- redis.getBit(thirdKey, 0) _ <- IO(assertEquals(r, Some(1.toLong))) - _ <- cmd.bitOpNot(thirdKey, key) - r2 <- cmd.getBit(thirdKey, 0) + _ <- redis.bitOpNot(thirdKey, key) + r2 <- redis.getBit(thirdKey, 0) _ <- IO(assertEquals(r2, Some(0.toLong))) - _ <- cmd.bitOpOr(thirdKey, key, secondKey) - r3 <- cmd.getBit(thirdKey, 0) + _ <- redis.bitOpOr(thirdKey, key, secondKey) + r3 <- redis.getBit(thirdKey, 0) _ <- IO(assertEquals(r3, Some(1.toLong))) _ <- for { - s1 <- cmd.setBit(key, 2, 1) - s2 <- cmd.setBit(key, 3, 1) - s3 <- cmd.setBit(key, 5, 1) - s4 <- cmd.setBit(key, 10, 1) - s5 <- cmd.setBit(key, 11, 1) - s6 <- cmd.setBit(key, 14, 1) + s1 <- redis.setBit(key, 2, 1) + s2 <- redis.setBit(key, 3, 1) + s3 <- redis.setBit(key, 5, 1) + s4 <- redis.setBit(key, 10, 1) + s5 <- redis.setBit(key, 11, 1) + s6 <- redis.setBit(key, 14, 1) } yield s1 + s2 + s3 + s4 + s5 + s6 - k <- cmd.getBit(key, 2) + k <- redis.getBit(key, 2) _ <- IO(assertEquals(k, Some(1.toLong))) - _ <- cmd.bitField( + _ <- redis.bitField( secondKey, SetUnsigned(2, 1), SetUnsigned(3, 1), @@ -361,126 +359,126 @@ trait TestScenarios { self: FunSuite => SetUnsigned(11, 1), IncrUnsignedBy(14, 1) ) - bits <- 0.to(14).toList.traverse(offset => cmd.getBit(secondKey, offset.toLong)) + bits <- 0.to(14).toList.traverse(offset => redis.getBit(secondKey, offset.toLong)) number <- IO.pure(Integer.parseInt(bits.map(_.getOrElse(0).toString).foldLeft("")(_ + _), 2)) _ <- IO(assertEquals(number, 23065)) - pos <- cmd.bitPos(key, state = false) + pos <- redis.bitPos(key, state = false) _ <- IO(assertEquals(pos, 4.toLong)) } yield () } - def stringsScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def stringsScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val key = "test" for { - x <- cmd.get(key) + x <- redis.get(key) _ <- IO(assert(x.isEmpty)) - isSet1 <- cmd.setNx(key, "some value") + isSet1 <- redis.setNx(key, "some value") _ <- IO(assert(isSet1)) - y <- cmd.get(key) + y <- redis.get(key) _ <- IO(assert(y.contains("some value"))) - isSet2 <- cmd.setNx(key, "should not happen") + isSet2 <- redis.setNx(key, "should not happen") _ <- IO(assert(!isSet2)) - isSet3 <- cmd.mSetNx(Map("multikey1" -> "someVal1", "multikey2" -> "someVal2")) + isSet3 <- redis.mSetNx(Map("multikey1" -> "someVal1", "multikey2" -> "someVal2")) _ <- IO(assert(isSet3)) - isSet4 <- cmd.mSetNx(Map("multikey1" -> "someVal0", "multikey3" -> "someVal3")) + isSet4 <- redis.mSetNx(Map("multikey1" -> "someVal0", "multikey3" -> "someVal3")) _ <- IO(assert(!isSet4)) - val1 <- cmd.get("multikey1") + val1 <- redis.get("multikey1") _ <- IO(assert(val1.contains("someVal1"))) - val3 <- cmd.get("multikey3") + val3 <- redis.get("multikey3") _ <- IO(assert(val3.isEmpty)) - isSet5 <- cmd.mSetNx(Map("multikey1" -> "someVal1", "multikey2" -> "someVal2")) + isSet5 <- redis.mSetNx(Map("multikey1" -> "someVal1", "multikey2" -> "someVal2")) _ <- IO(assert(!isSet5)) - w <- cmd.get(key) + w <- redis.get(key) _ <- IO(assert(w.contains("some value"))) - isSet6 <- cmd.set(key, "some value", SetArgs(SetArg.Existence.Nx)) + isSet6 <- redis.set(key, "some value", SetArgs(SetArg.Existence.Nx)) _ <- IO(assert(!isSet6)) - isSet7 <- cmd.set(key, "some value 2", SetArgs(SetArg.Existence.Xx)) + isSet7 <- redis.set(key, "some value 2", SetArgs(SetArg.Existence.Xx)) _ <- IO(assert(isSet7)) - val4 <- cmd.get(key) + val4 <- redis.get(key) _ <- IO(assert(val4.contains("some value 2"))) - _ <- cmd.del(key) - isSet8 <- cmd.set(key, "some value", SetArgs(SetArg.Existence.Xx)) + _ <- redis.del(key) + isSet8 <- redis.set(key, "some value", SetArgs(SetArg.Existence.Xx)) _ <- IO(assert(!isSet8)) - isSet9 <- cmd.set(key, "some value", SetArgs(SetArg.Existence.Nx)) + isSet9 <- redis.set(key, "some value", SetArgs(SetArg.Existence.Nx)) _ <- IO(assert(isSet9)) - val5 <- cmd.get(key) + val5 <- redis.get(key) _ <- IO(assert(val5.contains("some value"))) - isSet10 <- cmd.set(key, "some value 2", SetArgs(None, None)) + isSet10 <- redis.set(key, "some value 2", SetArgs(None, None)) _ <- IO(assert(isSet10)) - val6 <- cmd.get(key) + val6 <- redis.get(key) _ <- IO(assert(val6.contains("some value 2"))) - _ <- cmd.del(key) - z <- cmd.get(key) + _ <- redis.del(key) + z <- redis.get(key) _ <- IO(assert(z.isEmpty)) } yield () } - def stringsClusterScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def stringsClusterScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val key = "test" for { - x <- cmd.get(key) + x <- redis.get(key) _ <- IO(assert(x.isEmpty)) - isSet1 <- cmd.setNx(key, "some value") + isSet1 <- redis.setNx(key, "some value") _ <- IO(assert(isSet1)) - y <- cmd.get(key) + y <- redis.get(key) _ <- IO(assert(y.contains("some value"))) - isSet2 <- cmd.setNx(key, "should not happen") + isSet2 <- redis.setNx(key, "should not happen") _ <- IO(assert(!isSet2)) - w <- cmd.get(key) + w <- redis.get(key) _ <- IO(assert(w.contains("some value"))) - _ <- cmd.del(key) - z <- cmd.get(key) + _ <- redis.del(key) + z <- redis.get(key) _ <- IO(assert(z.isEmpty)) } yield () } - def connectionScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = - cmd.ping.flatMap(pong => IO(assertEquals(pong, "PONG"))).void + def connectionScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = + redis.ping.flatMap(pong => IO(assertEquals(pong, "PONG"))).void - def serverScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = + def serverScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = for { - _ <- cmd.mSet(Map("firstname" -> "Jack", "lastname" -> "Stuntman", "age" -> "35")) - names <- cmd.keys("*name*").map(_.toSet) + _ <- redis.mSet(Map("firstname" -> "Jack", "lastname" -> "Stuntman", "age" -> "35")) + names <- redis.keys("*name*").map(_.toSet) _ <- IO(assertEquals(names, Set("firstname", "lastname"))) - age <- cmd.keys("a??") + age <- redis.keys("a??") _ <- IO(assertEquals(age, List("age"))) - info <- cmd.info + info <- redis.info _ <- IO(assert(info.contains("role"))) - dbsize <- cmd.dbsize + dbsize <- redis.dbsize _ <- IO(assert(dbsize > 0)) - lastSave <- cmd.lastSave + lastSave <- redis.lastSave _ <- IO(assert(lastSave.isBefore(Instant.now))) - slowLogLen <- cmd.slowLogLen + slowLogLen <- redis.slowLogLen _ <- IO(assert(slowLogLen.isValidLong)) } yield () - def pipelineScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def pipelineScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val key1 = "testp1" val key2 = "testp2" val key3 = "testp3" - val operations = - cmd.set(key1, "osx") :: cmd.get(key3) :: cmd.set(key2, "linux") :: cmd.sIsMember("foo", "bar") :: HNil + val ops = (store: TxStore[IO, String, Option[String]]) => + List( + redis.set(key1, "osx"), + redis.get(key3).flatMap(store.set(key3)), + redis.set(key2, "linux") + ) val runPipeline = - RedisPipeline(cmd) - .filterExec(operations) - .map { - case res1 ~: res2 ~: HNil => - assertEquals(res1, Some("3")) - assert(!res2) + RedisPipe + .make(redis) + .use { + _.run(ops).map(kv => assertEquals(kv.get(key3).flatten, Some("3"))) } - .onError { - case PipelineError => fail("[Error] - Pipeline failed") - case _: TimeoutException => fail("[Error] - Timeout") - case e => fail(s"[Error] - ${e.getMessage}") + .recoverWith { + case e => fail(s"[Error] - ${e.getMessage}") } for { - _ <- cmd.set(key3, "3") + _ <- redis.set(key3, "3") _ <- runPipeline - v1 <- cmd.get(key1) - v2 <- cmd.get(key2) + v1 <- redis.get(key1) + v2 <- redis.get(key2) } yield { assertEquals(v1, Some("osx")) assertEquals(v2, Some("linux")) @@ -496,7 +494,7 @@ trait TestScenarios { self: FunSuite => val val3 = "linux" val del1 = "deleteme" - val operations = (store: RedisTx.Store[IO, String, Option[String]]) => + val operations = (store: TxStore[IO, String, Option[String]]) => List( redis.set(key2, val2), redis.get(key1).flatMap(store.set(s"$key1-v1")), @@ -521,32 +519,20 @@ trait TestScenarios { self: FunSuite => } } - // TODO: not sure if this is relevant anymore - //def canceledTransactionScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { - //val key1 = "tx-1" - //val key2 = "tx-2" - //val tx = RedisTransaction(cmd) - - //val commands = cmd.set(key1, "v1") :: cmd.set(key2, "v2") :: cmd.set("tx-3", "v3") :: HNil - - //// We race it with a plain `IO.unit` so the transaction may or may not start at all but the result should be the same - //IO.race(tx.exec(commands), IO.unit) >> cmd.get(key1).map(assertEquals(_, None)) // no keys written - //} - - def scriptsScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def scriptsScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val statusScript = """ |redis.call('set',KEYS[1],ARGV[1]) |redis.call('del',KEYS[1]) |return redis.status_reply('OK')""".stripMargin for { - fortyTwo <- cmd.eval("return 42", ScriptOutputType.Integer) + fortyTwo <- redis.eval("return 42", ScriptOutputType.Integer) _ <- IO(assertEquals(fortyTwo, 42L)) - value <- cmd.eval("return 'Hello World'", ScriptOutputType.Value) + value <- redis.eval("return 'Hello World'", ScriptOutputType.Value) _ <- IO(assertEquals(value, "Hello World")) - bool <- cmd.eval("return true", ScriptOutputType.Boolean, List("Foo")) + bool <- redis.eval("return true", ScriptOutputType.Boolean, List("Foo")) _ <- IO(assert(bool)) - list <- cmd.eval( + list <- redis.eval( "return {'Let', 'us', ARGV[1], ARGV[2]}", ScriptOutputType.Multi, Nil, @@ -556,39 +542,39 @@ trait TestScenarios { self: FunSuite => ) ) _ <- IO(assertEquals(list, List("Let", "us", "have", "fun"))) - _ <- cmd.eval(statusScript, ScriptOutputType.Status, List("test"), List("foo")) - sha42 <- cmd.scriptLoad("return 42") - fortyTwoSha <- cmd.evalSha(sha42, ScriptOutputType.Integer) + _ <- redis.eval(statusScript, ScriptOutputType.Status, List("test"), List("foo")) + sha42 <- redis.scriptLoad("return 42") + fortyTwoSha <- redis.evalSha(sha42, ScriptOutputType.Integer) _ <- IO(assertEquals(fortyTwoSha, 42L)) - shaStatusScript <- cmd.scriptLoad(statusScript) - _ <- cmd.evalSha(shaStatusScript, ScriptOutputType.Status, List("test"), List("foo", "bar")) - exists <- cmd.scriptExists(sha42, "foobar") + shaStatusScript <- redis.scriptLoad(statusScript) + _ <- redis.evalSha(shaStatusScript, ScriptOutputType.Status, List("test"), List("foo", "bar")) + exists <- redis.scriptExists(sha42, "foobar") _ <- IO(assertEquals(exists, List(true, false))) - shaStatusDigest <- cmd.digest(statusScript) + shaStatusDigest <- redis.digest(statusScript) _ <- IO(assertEquals(shaStatusScript, shaStatusDigest)) - _ <- cmd.scriptFlush - exists2 <- cmd.scriptExists(sha42) + _ <- redis.scriptFlush + exists2 <- redis.scriptExists(sha42) _ <- IO(assertEquals(exists2, List(false))) } yield () } - def hyperloglogScenario(cmd: RedisCommands[IO, String, String]): IO[Unit] = { + def hyperloglogScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = { val key = "hll" val key2 = "hll2" val key3 = "hll3" for { - x <- cmd.get(key) + x <- redis.get(key) _ <- IO(assert(x.isEmpty)) - c1 <- cmd.pfCount(key) + c1 <- redis.pfCount(key) _ <- IO(assertEquals(c1, 0L)) - _ <- cmd.pfAdd(key, "a", "b", "c") - c2 <- cmd.pfCount(key) + _ <- redis.pfAdd(key, "a", "b", "c") + c2 <- redis.pfCount(key) _ <- IO(assert(c2 > 0, "hyperloglog should think it has more than 0 items in")) - _ <- cmd.pfAdd(key2, "a", "b", "c") - c3 <- cmd.pfCount(key2) + _ <- redis.pfAdd(key2, "a", "b", "c") + c3 <- redis.pfCount(key2) _ <- IO(assert(c3 > 0, "second hyperloglog should think it has more than 0 items in")) - _ <- cmd.pfMerge(key3, key2, key) - c4 <- cmd.pfCount(key3) + _ <- redis.pfMerge(key3, key2, key) + c4 <- redis.pfCount(key3) _ <- IO(assert(c4 > 0, "merged hyperloglog should think it has more than 0 items in")) } yield () } diff --git a/site/docs/pipelining.md b/site/docs/pipelining.md index a153155d..52f83a6a 100644 --- a/site/docs/pipelining.md +++ b/site/docs/pipelining.md @@ -9,29 +9,19 @@ position: 7 Use [pipelining](https://redis.io/topics/pipelining) to speed up your queries by having full control of commands flushing. By default Redis works in autoflush mode but it can be disabled to "pipeline" commands to the server without waiting for a response. And at any point in time you can "flush commands". -`redis4cats` provides a `RedisPipeline` utility that models this behavior with some guarantees described below: +`redis4cats` provides a `RedisPipe` utility that models this behavior with some guarantees described below: -- `acquire`: disable autoflush and send a bunch of commands defined as a custom `HList`. +- `acquire`: disable autoflush and send a bunch of commands defined as a `List[F[Unit]]`. - `release`: either flush commands on success or log error on failure / cancellation. - `guarantee`: re-enable autoflush. -## Caveats - -⚠️ **Pipelining shares the same asynchronous implementation of transactions, meaning the order of the commands cannot be guaranteed.** ⚠️ - -This statement means that given the following set of operations. - -```scala -val operations = - redis.set(key1, "osx") :: redis.set(key2, "linux") :: redis.get(key1) :: - redis.set(key1, "bar") :: redis.set(key2, "foo") :: redis.get(key1) :: HNil -``` - -The result of those two `get` operations will not be deterministic. +⚠️ **Pipelining shares the same asynchronous implementation of transactions, so you can only run sequential pipelines from a single `RedisCommands` instance.** ⚠️ ### RedisPipeline usage -The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern is so common it is recommended to just use `RedisPipeline`. You can create a pipeline by passing the commands API as a parameter and invoke the `exec` function (or `filterExec`) given the set of commands you wish to send to the server. +The API for disabling / enabling autoflush and flush commands manually is available for you to use but since the pattern is so common it is recommended to use `RedisPipe`, because it shares the same implementation of `RedisTx`, which can be tricky to get right. + +You can create a pipeline by passing the commands API as a parameter and invoke the `run` function (or `exec`) given the set of commands you wish to send to the server. Note that every command has to be forked (`.start`) because the commands need to be sent to the server in an asynchronous way but no response will be received until the commands are successfully flushed. Also, it is not possible to sequence commands (`flatMap`) that are part of a pipeline. Every command has to be atomic and independent of previous results. @@ -54,18 +44,14 @@ val commandsApi: Resource[IO, RedisCommands[IO, String, String]] = { import cats.effect.IO import cats.implicits._ import dev.profunktor.redis4cats._ -import dev.profunktor.redis4cats.hlist._ -import dev.profunktor.redis4cats.pipeline._ -import java.util.concurrent.TimeoutException - -def putStrLn(str: String): IO[Unit] = IO(println(str)) +import dev.profunktor.redis4cats.tx._ val key1 = "testp1" val key2 = "testp2" val key3 = "testp3" val showResult: String => Option[String] => IO[Unit] = key => -_.fold(putStrLn(s"Not found key: $key"))(s => putStrLn(s"$key: $s")) + _.fold(IO.println(s"Not found key: $key"))(s => IO.println(s"$key: $s")) commandsApi.use { redis => // RedisCommands[IO, String, String] val getters = @@ -73,25 +59,22 @@ commandsApi.use { redis => // RedisCommands[IO, String, String] redis.get(key2).flatTap(showResult(key2)) >> redis.get(key3).flatTap(showResult(key3)) - val operations = - redis.set(key1, "osx") :: redis.get(key3) :: redis.set(key2, "linux") :: redis.sIsMember("foo", "bar") :: HNil + val ops = (store: TxStore[IO, String, Option[String]]) => + List( + redis.set(key1, "osx"), + redis.get(key3).flatMap(store.set(key3)), + redis.set(key2, "linux") + ) val runPipeline = - RedisPipeline(redis) - .filterExec(operations) - .map { - case res1 ~: res2 ~: HNil => - assert(res1.contains("3")) - assert(!res2) - } - .onError { - case PipelineError => - putStrLn("[Error] - Pipeline failed") - case _: TimeoutException => - putStrLn("[Error] - Timeout") - case e => - putStrLn(s"[Error] - $e") - } + RedisPipe.make(redis).use { + _.run(ops) + .flatMap(kv => IO.println(s"KV: $kv")) + .recoverWith { + case e => + IO.println(s"[Error] - ${e.getMessage}") + } + } val prog = for { @@ -104,8 +87,8 @@ commandsApi.use { redis => // RedisCommands[IO, String, String] assert(v2.contains("linux")) } - getters >> prog >> getters >> putStrLn("keep doing stuff...") + getters >> prog >> getters >> IO.println("keep doing stuff...") } ``` -The `filterExec` function filters out values of type `Unit`, which are normally irrelevant. If you find yourself needing the `Unit` types to verify some behavior, use `exec` instead. +The `run` function provides a `TxStore` we can use to store values we run within the pipeline for later retrieval, same as we do with transactions. If you don't need the store, prefer to use `exec` instead. diff --git a/site/docs/transactions.md b/site/docs/transactions.md index 7990dcac..77a7f75d 100644 --- a/site/docs/transactions.md +++ b/site/docs/transactions.md @@ -65,7 +65,7 @@ commandsApi.use { redis => // RedisCommands[IO, String, String] redis.get(key1).flatTap(showResult(key1)) >> redis.get(key2).flatTap(showResult(key2)) - val ops = (store: RedisTx.Store[IO, String, Option[String]]) => + val ops = (store: TxStore[IO, String, Option[String]]) => List( redis.set(key1, "foo"), redis.del(key2).void, @@ -93,7 +93,7 @@ It should be exclusively used to run Redis commands as part of a transaction, no Transactional commands may be discarded if something went wrong in between. -The `run` function returns the values stored in the given `RedisTx.Store`, which is used to save results of commands that run as part of the transaction for later retrieval. +The `run` function returns the values stored in the given `TxStore`, which is used to save results of commands that run as part of the transaction for later retrieval. If you are only writing values (e.g. only using `set`), you may prefer to use `exec` instead.