Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elide thunk allocation when using sleepInternal #3775

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.IOFiberConstants.ExecuteRunnableR"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.scope"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.IOFiberConstants.ContStateResult")
"cats.effect.IOFiberConstants.ContStateResult"),
// #3775, changes to internal timers APIs
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"cats.effect.unsafe.TimerSkipList.insert"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"cats.effect.unsafe.WorkerThread.sleep")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable
private[effect] def sleep(
delay: FiniteDuration,
task: Runnable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,21 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
cb: Callback,
next: Node
) extends TimerSkipListNodeBase[Callback, Node](cb, next)
with Function0[Unit]
with Runnable {

/**
* Cancels the timer
*/
final override def run(): Unit = {
final def apply(): Unit = {
// TODO: We could null the callback here directly,
// TODO: and the do the lookup after (for unlinking).
TimerSkipList.this.doRemove(triggerTime, sequenceNum)
()
}

final def run() = apply()

private[TimerSkipList] final def isMarker: Boolean = {
// note: a marker node also has `triggerTime == MARKER`,
// but that's also a valid trigger time, so we need
Expand Down Expand Up @@ -158,7 +161,7 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
delay: Long,
callback: Right[Nothing, Unit] => Unit,
tlr: ThreadLocalRandom
): Runnable = {
): Function0[Unit] with Runnable = {
require(delay >= 0L)
// we have to check for overflow:
val triggerTime = computeTriggerTime(now = now, delay = delay)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,9 @@ private[effect] final class WorkStealingThreadPool(
/**
* Tries to call the current worker's `sleep`, but falls back to `sleepExternal` if needed.
*/
def sleepInternal(delay: FiniteDuration, callback: Right[Nothing, Unit] => Unit): Runnable = {
def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
val thread = Thread.currentThread()
if (thread.isInstanceOf[WorkerThread]) {
val worker = thread.asInstanceOf[WorkerThread]
Expand All @@ -642,7 +644,7 @@ private[effect] final class WorkStealingThreadPool(
*/
private[this] final def sleepExternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable = {
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
val random = ThreadLocalRandom.current()
val idx = random.nextInt(threadCount)
val tsl = sleepers(idx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ private final class WorkerThread(
}
}

def sleep(delay: FiniteDuration, callback: Right[Nothing, Unit] => Unit): Runnable = {
def sleep(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
// take the opportunity to update the current time, just in case other timers can benefit
val _now = System.nanoTime()
now = _now
Expand Down
16 changes: 10 additions & 6 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -925,13 +925,17 @@ private final class IOFiber[A](
IO {
val scheduler = runtime.scheduler

val cancel =
if (scheduler.isInstanceOf[WorkStealingThreadPool])
scheduler.asInstanceOf[WorkStealingThreadPool].sleepInternal(delay, cb)
else
scheduler.sleep(delay, () => cb(RightUnit))
val cancelIO =
if (scheduler.isInstanceOf[WorkStealingThreadPool]) {
val cancel =
scheduler.asInstanceOf[WorkStealingThreadPool].sleepInternal(delay, cb)
IO.Delay(cancel, null)
} else {
val cancel = scheduler.sleep(delay, () => cb(RightUnit))
IO(cancel.run())
}

Some(IO(cancel.run()))
Some(cancelIO)
}
}
else IO.cede
Expand Down