Skip to content

Commit

Permalink
Merge pull request typelevel#3800 from forkedcancel/bug/hotswap-block…
Browse files Browse the repository at this point in the history
…s-get

Fix `Hotswap#get` being blocked by `Hotswap#swap`
  • Loading branch information
armanbilge authored Sep 18, 2023
2 parents 9617156 + 854e2b8 commit 94a9059
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
10 changes: 5 additions & 5 deletions std/shared/src/main/scala/cats/effect/std/Hotswap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ object Hotswap {
new Hotswap[F, R] {

override def swap(next: Resource[F, R]): F[R] =
exclusive.surround {
F.uncancelable { poll =>
poll(next.allocated).flatMap {
case (r, fin) =>
F.uncancelable { poll =>
poll(next.allocated).flatMap {
case (r, fin) =>
exclusive.mapK(poll).onCancel(Resource.eval(fin)).surround {
swapFinalizer(Acquired(r, fin)).as(r)
}
}
}
}

Expand Down
44 changes: 44 additions & 0 deletions tests/shared/src/test/scala/cats/effect/std/HotswapSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package cats
package effect
package std

import cats.effect.Resource
import cats.effect.kernel.Ref
import cats.effect.testkit.TestControl
import cats.effect.unsafe.IORuntimeConfig

import scala.concurrent.duration._

Expand Down Expand Up @@ -104,6 +107,47 @@ class HotswapSpec extends BaseSpec { outer =>

go must completeAs(())
}

"not block current resource while swap is instantiating new one" in ticked {
implicit ticker =>
val go = Hotswap.create[IO, Unit].use { hs =>
hs.swap(IO.sleep(1.minute).toResource).start *>
IO.sleep(5.seconds) *>
hs.get.use_.timeout(1.second).void
}
go must completeAs(())
}

"successfully cancel during swap and run finalizer if cancelation is requested while waiting for get to release" in ticked {
implicit ticker =>
val go = Ref.of[IO, List[String]](List()).flatMap { log =>
Hotswap[IO, Unit](logged(log, "a")).use {
case (hs, _) =>
for {
_ <- hs.get.evalMap(_ => IO.sleep(1.minute)).use_.start
_ <- IO.sleep(2.seconds)
_ <- hs.swap(logged(log, "b")).timeoutTo(1.second, IO.unit)
value <- log.get
} yield value
}
}

go must completeAs(List("open a", "open b", "close b"))
}

"swap is safe to concurrent cancelation" in ticked { implicit ticker =>
val go = IO.ref(false).flatMap { open =>
Hotswap[IO, Unit](Resource.unit)
.use {
case (hs, _) =>
hs.swap(Resource.make(open.set(true))(_ =>
open.getAndSet(false).map(_ should beTrue).void))
}
.race(IO.unit) *> open.get.map(_ must beFalse)
}

TestControl.executeEmbed(go, IORuntimeConfig(1, 2)).replicateA_(1000) must completeAs(())
}
}

}

0 comments on commit 94a9059

Please sign in to comment.