diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 4f093c7eb2..0756d6648f 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -102,6 +102,8 @@ private[effect] final class WorkStealingThreadPool( */ private[this] val done: AtomicBoolean = new AtomicBoolean(false) + private[this] val blockedWorkerThreadCounter: AtomicInteger = new AtomicInteger(0) + // Thread pool initialization block. { // Set up the worker threads. @@ -563,4 +565,81 @@ private[effect] final class WorkStealingThreadPool( Thread.currentThread().interrupt() } } + + private[unsafe] def blockedWorkerThreadCounterForwarder: AtomicInteger = + blockedWorkerThreadCounter + + /* + * What follows is a collection of methos used in the implementation of the + * `cats.effect.unsafe.metrics.ComputePoolSamplerMBean` interface. + */ + + /** + * Returns the number of [[WorkerThread]] instances backing the [[WorkStealingThreadPool]]. + * + * @note + * This is a fixed value, as the [[WorkStealingThreadPool]] has a fixed number of worker + * threads. + * + * @return + * the number of worker threads backing the compute pool + */ + private[unsafe] def getWorkerThreadCount(): Int = + threadCount + + /** + * Returns the number of active [[WorkerThread]] instances currently executing fibers on the + * compute thread pool. + * + * @return + * the number of active worker threads + */ + private[unsafe] def getActiveThreadCount(): Int = { + val st = state.get() + (st & UnparkMask) >>> UnparkShift + } + + /** + * Returns the number of [[WorkerThread]] instances currently searching for fibers to steal + * from other worker threads. + * + * @return + * the number of worker threads searching for work + */ + private[unsafe] def getSearchingThreadCount(): Int = { + val st = state.get() + st & SearchMask + } + + /** + * Returns the number of [[WorkerThread]] instances which are currently blocked due to running + * blocking actions on the compute thread pool. + * + * @return + * the number of blocked worker threads + */ + private[unsafe] def getBlockedWorkerThreadCount(): Int = + blockedWorkerThreadCounter.get() + + /** + * Returns the total number of fibers enqueued on all local queues. + * + * @return + * the total number of fibers enqueued on all local queues + */ + private[unsafe] def getLocalQueueFiberCount(): Long = + localQueues.map(_.size().toLong).sum + + /** + * Returns the number of fibers which are currently asynchronously suspended. + * + * @note + * This counter is not synchronized due to performance reasons and might be reporting + * out-of-date numbers. + * + * @return + * the number of asynchronously suspended fibers + */ + private[unsafe] def getSuspendedFiberCount(): Long = + workerThreads.map(_.getSuspendedFiberCount().toLong).sum } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 29fc80195c..2bf883fc21 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -517,6 +517,7 @@ private final class WorkerThread( // this worker thread has run its course and it is time to die, after the // blocking code has been successfully executed. blocking = true + pool.blockedWorkerThreadCounterForwarder.incrementAndGet() // Spawn a new `WorkerThread`, a literal clone of this one. It is safe to // transfer ownership of the local queue and the parked signal to the new @@ -536,7 +537,25 @@ private final class WorkerThread( // With another `WorkerThread` started, it is time to execute the blocking // action. - thunk + val result = thunk + + pool.blockedWorkerThreadCounterForwarder.decrementAndGet() + + result } } + + /** + * Returns the number of fibers which are currently asynchronously suspended and tracked by + * this worker thread. + * + * @note + * This counter is not synchronized due to performance reasons and might be reporting + * out-of-date numbers. + * + * @return + * the number of asynchronously suspended fibers + */ + def getSuspendedFiberCount(): Int = + fiberBag.size() } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala new file mode 100644 index 0000000000..d30d87c624 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe +package metrics + +/** + * An implementation of the [[ComputePoolSamplerMBean]] interface which simply delegates to the + * corresponding methods of the [[cats.effect.unsafe.WorkStealingThreadPool]] being monitored. + * + * @param queue + * the monitored local queue + */ +private[unsafe] final class ComputePoolSampler(compute: WorkStealingThreadPool) + extends ComputePoolSamplerMBean { + def getWorkerThreadCount(): Int = compute.getWorkerThreadCount() + def getActiveThreadCount(): Int = compute.getActiveThreadCount() + def getSearchingThreadCount(): Int = compute.getSearchingThreadCount() + def getBlockedWorkerThreadCount(): Int = compute.getBlockedWorkerThreadCount() + def getLocalQueueFiberCount(): Long = compute.getLocalQueueFiberCount() + def getSuspendedFiberCount(): Long = compute.getSuspendedFiberCount() +} diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala new file mode 100644 index 0000000000..58cffc3eb8 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe.metrics + +/** + * An MBean interface for monitoring a [[WorkStealingThreadPool]] backed compute thread pool. + */ +private[unsafe] trait ComputePoolSamplerMBean { + + /** + * Returns the number of [[WorkerThread]] instances backing the [[WorkStealingThreadPool]]. + * + * @note + * This is a fixed value, as the [[WorkStealingThreadPool]] has a fixed number of worker + * threads. + * + * @return + * the number of worker threads backing the compute pool + */ + def getWorkerThreadCount(): Int + + /** + * Returns the number of active [[WorkerThread]] instances currently executing fibers on the + * compute thread pool. + * + * @return + * the number of active worker threads + */ + def getActiveThreadCount(): Int + + /** + * Returns the number of [[WorkerThread]] instances currently searching for fibers to steal + * from other worker threads. + * + * @return + * the number of worker threads searching for work + */ + def getSearchingThreadCount(): Int + + /** + * Returns the number of [[WorkerThread]] instances which are currently blocked due to running + * blocking actions on the compute thread pool. + * + * @return + * the number of blocked worker threads + */ + def getBlockedWorkerThreadCount(): Int + + /** + * Returns the total number of fibers enqueued on all local queues. + * + * @return + * the total number of fibers enqueued on all local queues + */ + def getLocalQueueFiberCount(): Long + + /** + * Returns the number of fibers which are currently asynchronously suspended. + * + * @return + * the number of asynchronously suspended fibers + */ + def getSuspendedFiberCount(): Long +} diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala index 19985ea9c2..3abc9eb579 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala @@ -24,7 +24,8 @@ package metrics * @param queue * the monitored local queue */ -private[unsafe] class LocalQueueSampler(queue: LocalQueue) extends LocalQueueSamplerMBean { +private[unsafe] final class LocalQueueSampler(queue: LocalQueue) + extends LocalQueueSamplerMBean { def getFiberCount(): Int = queue.getFiberCount() def getHeadIndex(): Int = queue.getHeadIndex() def getTailIndex(): Int = queue.getTailIndex()