Skip to content

Commit

Permalink
Join worker threads on pool shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Aug 27, 2023
1 parent 1a7f085 commit 8a37117
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ lazy val tests: CrossProject = crossProject(JSPlatform, JVMPlatform, NativePlatf
scalacOptions ~= { _.filterNot(_.startsWith("-P:scalajs:mapSourceURI")) }
)
.jvmSettings(
Test / fork := true,
fork := true,
Test / javaOptions += s"-Dsbt.classpath=${(Test / fullClasspath).value.map(_.data.getAbsolutePath).mkString(File.pathSeparator)}"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ private[effect] final class WorkStealingThreadPool[P](
def shutdown(): Unit = {
// Clear the interrupt flag.
val interruptCalling = Thread.interrupted()
val currentThread = Thread.currentThread()

// Execute the shutdown logic only once.
if (done.compareAndSet(false, true)) {
Expand All @@ -701,27 +702,31 @@ private[effect] final class WorkStealingThreadPool[P](
// the face of unhandled exceptions or as part of the whole JVM exiting.
var i = 0
while (i < threadCount) {
workerThreads(i).interrupt()
val workerThread = workerThreads(i)
if (workerThread ne currentThread) {
workerThread.interrupt()
workerThread.join()
// wait to stop before closing pollers
}
system.closePoller(pollers(i))
i += 1
}

system.close()

// Clear the interrupt flag.
Thread.interrupted()

var t: WorkerThread[P] = null
while ({
t = cachedThreads.pollFirst()
t ne null
}) {
t.interrupt()
// don't join, blocking threads may be uninterruptibly blocked.
// anyway, they do not have pollers to close.
}

// Drain the external queue.
externalQueue.clear()
if (interruptCalling) Thread.currentThread().interrupt()
if (interruptCalling) currentThread.interrupt()
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/jvm/src/main/scala/catseffect/examplesplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ package examples {
super.runtimeConfig.copy(shutdownHookTimeout = Duration.Zero)

val run: IO[Unit] =
IO(System.exit(0)).uncancelable
IO.blocking(System.exit(0)).uncancelable
}

object FatalErrorUnsafeRun extends IOApp {
Expand Down

0 comments on commit 8a37117

Please sign in to comment.