-
Notifications
You must be signed in to change notification settings - Fork 357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix query cancellation with cancelable #2079
fix query cancellation with cancelable #2079
Conversation
699a386
to
5591005
Compare
@TalkingFoxMid thank you for giving it a shot! @jatcwang, to be honest, I personally like this approach better. I wonder, is there a chance that we could weigh this PR in? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but in fact it is not that much because most of the changes happened in the auto-generated code.
yes, would you mind pointing me at the non-auto-generated changes? 😅 still at a glance I think this approach is better too, since its using the existing cancelable
IIUC.
} | ||
} | ||
|
||
scenario.unsafeRunSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of unsafeRunSync()
should use munit-cats-effect CatsEffectSuite
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently don't use it at the moment - I can handle this in another PR to move all tests to use CatsEffectSuite
@armanbilge Non-autogenerated changes are in modules/core/src/main/scala/doobie/hi/connection.scala |
which made We should also need to make Streaming queries cancellable ( In general I'm quite happy with this approach too as it should have less overhead 👍 Thanks @TalkingFoxMid ! |
5591005
to
4b90d8f
Compare
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
To be cancelable, the @@ -10,8 +10,8 @@ object TestCancelable extends IOApp.Simple {
acquire.flatMap(
r =>
unmask {
- IO.sleep(5.seconds) >> IO(println(s"USE $r"))
- }.onCancel(IO(println(s"release $r")))
+ (IO.sleep(5.seconds) >> IO(println(s"USE $r"))).onCancel(IO(println(s"release $r")))
+ }
)
} After making this change the result is:
|
This comment was marked as resolved.
This comment was marked as resolved.
@TalkingFoxMid thanks, I think there is still some confusion. object TestCancelable extends IOApp.Simple {
val acquire: IO[String] = IO.println("START AQ") >> IO.sleep(2.seconds) >> IO.println("END AQ") >> IO("RESOURCE")
def evaluation: IO[Unit] = IO.uncancelable {
unmask =>
acquire.flatMap(
r =>
unmask {
(IO.sleep(5.seconds) >> IO(println(s"USE ${r}"))).cancelable(IO(println(s"canceling USE ${r}")))
}.onCancel(IO(println(s"release ${r}")))
)
}
override def run: IO[Unit] =
for {
f <- evaluation.start
_ <- IO.sleep(1.seconds)
_ <- f.cancel
_ <- IO.never[Unit]
} yield ()
}
|
This comment was marked as resolved.
This comment was marked as resolved.
Edit: Sergey's answer in #2079 (comment) is much better :) my original comment is hidden below.
Strange, I was not able to replicate your result. It works for me. Here is the exact code I am running, which is directly copied from your example: //> using dep org.typelevel::cats-effect::3.5.4
import cats.effect.{IO, IOApp}
import scala.concurrent.duration.DurationInt
object TestCancelable extends IOApp.Simple {
val acquire: IO[String] = IO.println("START AQ") >> IO.sleep(2.seconds) >> IO.println("END AQ") >> IO("RESOURCE")
def evaluation: IO[Unit] = IO.uncancelable {
unmask =>
acquire.flatMap(
r =>
unmask {
(IO(Thread.sleep(5)) >> IO(println(s"USE ${r}"))).cancelable(IO(println(s"canceling USE ${r}")))
}.onCancel(IO(println(s"release ${r}")))
)
}
override def run: IO[Unit] =
for {
f <- evaluation.start
_ <- IO.sleep(1.seconds)
_ <- f.cancel
_ <- IO.never[Unit]
} yield ()
}
Note, the correct way to write a fiber on a blocked thread is |
Actually, your example (after I fix
I.e. the callback effect The reason why the However, if you change your timings to the following values: val acquire: IO[String] =
IO.println("START AQ") >> IO.sleep(2.seconds) >> IO.println("END AQ") >> IO("RESOURCE")
def evaluation: IO[Unit] = IO.uncancelable { unmask =>
acquire.flatMap(r =>
unmask {
// Sleep time increased to 2 seconds (was 5 ms)
(IO(Thread.sleep(2000)) >> IO(println(s"USE $r")))
.cancelable(IO(println(s"canceling USE $r")))
}.onCancel(IO(println(s"release ${r}")))
)
}
override def run: IO[Unit] =
for {
f <- evaluation.start
_ <- IO.sleep(3.seconds) // increased to 3 seconds
_ <- f.cancel
_ <- IO.never[Unit]
} yield ()
Here the cancel signal comes inside the unmasked region. |
Sorry for confusion and thank you all for trying to explain, it was very helpful. Now I see that there is cases when However there is also small issue: call duplication. There are some cases when |
90c645c
to
5b5e3e9
Compare
I've finally done a solution that seems to be complete now and looking forward for your comments/approves As for streams, I suggest doing this in another PR later |
4db468d
to
bdd1979
Compare
val logEventRef: Ref[IO, LogEvent] = | ||
Ref.of[IO, LogEvent](null).unsafeRunSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IOLocal is no longer works in logHandler because there is cancelable
which forks a fiber in a middle of query processing and IOLocal context isn't shared between two fibers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we guarantee that IOLocal works with logHandler ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch yes we probably need to. One of our docuemented examples also uses IOLocal to pass additional context to LogHandler.
In a perfect world, I do feel like IO.cancelable
implementation should pass on the IOLocal though..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a perfect world, I do feel like
IO.cancelable
implementation should pass on the IOLocal though..
In general, you cannot make these kinds of assumptions when using IOLocal
with combinators. Further reading:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are a lot of problems with maintaining this feature:
- It is not very functional. I think the most appropriate way to do the same - use writer monads, and I suppose that is possible in current setup.
- It restricts ours capabilities to rely on CE3 functional combinators in Doobie code
To be honest, I think that IOLocal should be used exclusively in the internal code (as it is impure) of an application (or library). The API of a functional library should not provide (or at least guarantee that it will work fine) the use of this tool to users.
So I wish to stop maintaining this feature, remove code example and migrate users to more suitable instruments (like a writer monad or a reader with Ref in context). Is it possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please give a reference to place in the documentation where IOLocal is used. IOLocal can be used to pass context to LogHandler. But It can't be used to modify this context by logHandler if we use cancelable (in tests loghandler modifies IOLocal context, it puts log in IOLocal). I've found this example in doc:
import cats.effect.{IOLocal, Ref}
import doobie.util.log.Success
def users = List.range(0, 4).map(n => s"user-$n")
def program: IO[List[String]] =
for {
// define an IOLocal where we store the user which caused the query to be run
currentUser <- IOLocal("")
// store all successful sql here, for all users
successLogsRef <- Ref[IO].of(List.empty[String])
xa = Transactor.fromDriverManager[IO](
driver = "org.h2.Driver",
url = "jdbc:h2:mem:queryspec;DB_CLOSE_DELAY=-1",
user = "sa",
password = "",
logHandler = Some(new LogHandler[IO] {
def run(logEvent: LogEvent): IO[Unit] =
currentUser.get.flatMap(user => successLogsRef.update(logs => s"sql for user $user: '${logEvent.sql}'" :: logs))
})
)
// run a bunch of queries
_ <- users.parTraverse(user =>
for {
_ <- currentUser.set(user)
_ <- sql"select 1".query[Int].unique.transact(xa)
} yield ()
)
// return collected log messages
logs <- successLogsRef.get
} yield logs
program.unsafeRunSync().sorted
It will work because child fibers inherit context of parents. So if we set up context in parrent fiber it can be accessed in loghandler. But if we modify context in loghandler this modification can't be propagated to a parent fiber. In tests there is modification of context in child fiber
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider these two examples:
import cats.effect.{IO, IOApp, IOLocal}
object IOCancelTest extends IOApp.Simple {
def childTask(local: IOLocal[Int]): IO[Unit] =
for {
_ <- local.get.flatTap(IO.println) // 42 inherits from parent fiber
} yield ()
override def run: IO[Unit] = for {
local <- IOLocal(42)
_ <- childTask(local).cancelable(IO(println("some finalizer")))
} yield ()
}
import cats.effect.{IO, IOApp, IOLocal}
object IOCancelTest2 extends IOApp.Simple {
def childTask(local: IOLocal[Int]): IO[Unit] =
for {
_ <- local.set(64)
} yield ()
override def run: IO[Unit] = for {
local <- IOLocal(42)
_ <- childTask(local).cancelable(IO(println("some finalizer")))
_ <- local.get.flatTap(IO.println) // 42 even after update in child fiber
} yield ()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep you've found the example in the docs 👍
I think trying to modify IOLocal within a ConnectionIO should be quite rare. Since this is a general issue with IOLocal propagation it's not something we can solve here.
Thanks for updating the tests to use Ref 👍
|
||
import scala.concurrent.duration.DurationInt | ||
|
||
class HikariQueryCancellationSuite extends munit.FunSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed QueryCancellationSuite in favor of more appropriate HikariQueryCancellationSuite because current solution doesn't work well with QueryCancellationSuite. Here is code for history:
// Copyright (c) 2013-2020 Rob Norris and Contributors
// This software is licensed under the MIT License (MIT).
// For more information see LICENSE or https://opensource.org/licenses/MIT
package doobie.util
import cats.effect.IO
import doobie.Transactor
import doobie.*
import doobie.implicits.*
import cats.syntax.all.*
import scala.concurrent.duration.DurationInt
class QueryCancellationSuite extends munit.FunSuite {
import cats.effect.unsafe.implicits.global
val xa = Transactor.fromDriverManager[IO](
driver = "org.h2.Driver",
url = "jdbc:h2:mem:queryspec;DB_CLOSE_DELAY=-1;DEFAULT_LOCK_TIMEOUT=60000;LOCK_TIMEOUT=60000",
user = "sa",
password = "",
logHandler = None
)
test("Query cancel") {
val scenario = WeakAsync.liftIO[ConnectionIO].use { elevator =>
for {
_ <- sql"CREATE TABLE IF NOT EXISTS example_table ( id INT)".update.run.transact(xa)
_ <- sql"TRUNCATE TABLE example_table".update.run.transact(xa)
_ <- sql"INSERT INTO example_table (id) VALUES (1)".update.run.transact(xa)
_ <- {
sql"select * from example_table for update".query[Int].unique >> elevator.liftIO(IO.never)
}.transact(xa).start
insertWithLockFiber <- {
for {
_ <- IO.sleep(100.milli)
insertFiber <- sql"UPDATE example_table SET id = 2".update.run.transact(xa).start
_ <- IO.sleep(100.milli)
_ <- insertFiber.cancel
} yield ()
}.start
_ <- IO.race(insertWithLockFiber.join, IO.sleep(9.seconds) >> IO(fail("Cancellation is blocked")))
result <- sql"SELECT * FROM example_table".query[Int].to[List].transact(xa)
} yield assertEquals(result, List(1))
}
scenario.unsafeRunSync()
}
}
Sorry, what is an example of a case where |
.bracket(ps => IFC.embed(ps, prepLogged *> execAndProcessLogged))(ps => IFC.embed(ps, IFPS.close)) | ||
.bracket(ps => | ||
WeakAsyncConnectionIO.cancelable( | ||
IFC.embed(ps, prepLogged *> execAndProcessLogged), | ||
IFC.embed(ps, IFPS.close) | ||
))(IFC.embed(_, IFPS.close) ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks right 👍
Sorry for confusion. I mean that if we want finalizer to run instantly after we cancel a fiber - we must use |
I feel there's still a confusion.
|
Yes, and to add on to that a bit: I would not say that |
I guess the confusion might come from the fact, "finalizer" is the name used in the docs for
Also, its parameter name is |
bc252b1
to
061d092
Compare
061d092
to
ed60a5f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM thanks @TalkingFoxMid! Very much appreciate the reviews too @armanbilge @satorg
Alternative fix of problem described here #2077