From b31a03eff32e1ad500e79c965bdc05933a7a4e24 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Fri, 12 Nov 2021 23:12:44 +0100 Subject: [PATCH 1/8] Define the MBean interface --- .../metrics/ComputePoolSamplerMBean.scala | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala new file mode 100644 index 0000000000..6b43d751bf --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe.metrics + +/** + * An MBean interface for monitoring a [[WorkStealingThreadPool]] backed compute thread pool. + */ +private[unsafe] trait ComputePoolSamplerMBean { + + /** + * Returns the number of [[WorkerThread]] instances backing the [[WorkStealingThreadPool]]. + * + * @note + * This is a fixed value, as the [[WorkStealingThreadPool]] has a fixed number of worker + * threads. + * + * @return + * the number of worker threads backing the compute pool + */ + def getWorkerThreadCount(): Int + + /** + * Returns the number of active [[WorkerThread]] instances currently executing fibers on the + * compute thread pool. + * + * @return + * the number of active worker threads + */ + def getActiveThreadCount(): Int + + /** + * Returns the number of [[WorkerThread]] instances currently searching for fibers to steal + * from other worker threads. + * + * @return + * the number of worker threads searching for work + */ + def getSearchingThreadCount(): Int + + /** + * Returns the number of [[WorkerThread]] instances which are currently blocked due to running + * blocking actions on the compute thread pool. + * + * @return + * the number of blocked worker threads + */ + def getBlockedWorkerThreadCount(): Int + + /** + * Returns the number of fibers enqueued on the batched queue. + * + * @return + * the number of fibers enqueued on the batched queue + */ + def getBatchedQueueFiberCount(): Long + + /** + * Returns the total number of fibers enqueued on all local queues. + * + * @return + * the total number of fibers enqueued on all local queues + */ + def getLocalQueueFiberCount(): Long + + /** + * Returns the number of fibers which are currently asynchronously suspended. + * + * @return + * the number of asynchronously suspended fibers + */ + def getSuspendedFiberCount(): Long +} From 2697b8d50350a6ab1d5fb1553f5f53f8f32f150d Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Fri, 12 Nov 2021 23:23:32 +0100 Subject: [PATCH 2/8] Implement the simple metrics --- .../unsafe/WorkStealingThreadPool.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 4f093c7eb2..6b15cfa838 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -563,4 +563,55 @@ private[effect] final class WorkStealingThreadPool( Thread.currentThread().interrupt() } } + + /* + * What follows is a collection of methos used in the implementation of the + * `cats.effect.unsafe.metrics.ComputePoolSamplerMBean` interface. + */ + + /** + * Returns the number of [[WorkerThread]] instances backing the [[WorkStealingThreadPool]]. + * + * @note + * This is a fixed value, as the [[WorkStealingThreadPool]] has a fixed number of worker + * threads. + * + * @return + * the number of worker threads backing the compute pool + */ + def getWorkerThreadCount(): Int = + threadCount + + /** + * Returns the number of active [[WorkerThread]] instances currently executing fibers on the + * compute thread pool. + * + * @return + * the number of active worker threads + */ + def getActiveThreadCount(): Int = { + val st = state.get() + (st & UnparkMask) >>> UnparkShift + } + + /** + * Returns the number of [[WorkerThread]] instances currently searching for fibers to steal + * from other worker threads. + * + * @return + * the number of worker threads searching for work + */ + def getSearchingThreadCount(): Int = { + val st = state.get() + st & SearchMask + } + + /** + * Returns the total number of fibers enqueued on all local queues. + * + * @return + * the total number of fibers enqueued on all local queues + */ + def getLocalQueueFiberCount(): Long = + localQueues.map(_.size().toLong).sum } From 9ebfb4eb189d75c12f316020fc9b7ada6fd6b278 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Fri, 12 Nov 2021 23:37:19 +0100 Subject: [PATCH 3/8] Implement suspended fiber count metric --- .../effect/unsafe/WorkStealingThreadPool.scala | 13 +++++++++++++ .../scala/cats/effect/unsafe/WorkerThread.scala | 14 ++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 6b15cfa838..33b133c8bb 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -614,4 +614,17 @@ private[effect] final class WorkStealingThreadPool( */ def getLocalQueueFiberCount(): Long = localQueues.map(_.size().toLong).sum + + /** + * Returns the number of fibers which are currently asynchronously suspended. + * + * @note + * This counter is not synchronized due to performance reasons and might be reporting + * out-of-date numbers. + * + * @return + * the number of asynchronously suspended fibers + */ + def getSuspendedFiberCount(): Long = + workerThreads.map(_.getSuspendedFiberCount().toLong).sum } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 29fc80195c..833d268c6e 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -539,4 +539,18 @@ private final class WorkerThread( thunk } } + + /** + * Returns the number of fibers which are currently asynchronously suspended and tracked by + * this worker thread. + * + * @note + * This counter is not synchronized due to performance reasons and might be reporting + * out-of-date numbers. + * + * @return + * the number of asynchronously suspended fibers + */ + def getSuspendedFiberCount(): Int = + fiberBag.size() } From 9e1e9418681a3c5f927b07c7cd4fc464b791c9a7 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Fri, 12 Nov 2021 23:45:46 +0100 Subject: [PATCH 4/8] Remove batched queue fibers metric --- .../effect/unsafe/metrics/ComputePoolSamplerMBean.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala index 6b43d751bf..58cffc3eb8 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSamplerMBean.scala @@ -60,14 +60,6 @@ private[unsafe] trait ComputePoolSamplerMBean { */ def getBlockedWorkerThreadCount(): Int - /** - * Returns the number of fibers enqueued on the batched queue. - * - * @return - * the number of fibers enqueued on the batched queue - */ - def getBatchedQueueFiberCount(): Long - /** * Returns the total number of fibers enqueued on all local queues. * From b9252e91a715e36a59de00a6d53c84b8011f4d02 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Fri, 12 Nov 2021 23:50:50 +0100 Subject: [PATCH 5/8] Implement blocked worker thread metric --- .../effect/unsafe/WorkStealingThreadPool.scala | 15 +++++++++++++++ .../scala/cats/effect/unsafe/WorkerThread.scala | 7 ++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 33b133c8bb..bffcc3e4c4 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -102,6 +102,8 @@ private[effect] final class WorkStealingThreadPool( */ private[this] val done: AtomicBoolean = new AtomicBoolean(false) + private[this] val blockedWorkerThreadCounter: AtomicInteger = new AtomicInteger(0) + // Thread pool initialization block. { // Set up the worker threads. @@ -564,6 +566,9 @@ private[effect] final class WorkStealingThreadPool( } } + def blockedWorkerThreadCounterForwarder: AtomicInteger = + blockedWorkerThreadCounter + /* * What follows is a collection of methos used in the implementation of the * `cats.effect.unsafe.metrics.ComputePoolSamplerMBean` interface. @@ -606,6 +611,16 @@ private[effect] final class WorkStealingThreadPool( st & SearchMask } + /** + * Returns the number of [[WorkerThread]] instances which are currently blocked due to running + * blocking actions on the compute thread pool. + * + * @return + * the number of blocked worker threads + */ + def getBlockedWorkerThreadCount(): Int = + blockedWorkerThreadCounter.get() + /** * Returns the total number of fibers enqueued on all local queues. * diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 833d268c6e..2bf883fc21 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -517,6 +517,7 @@ private final class WorkerThread( // this worker thread has run its course and it is time to die, after the // blocking code has been successfully executed. blocking = true + pool.blockedWorkerThreadCounterForwarder.incrementAndGet() // Spawn a new `WorkerThread`, a literal clone of this one. It is safe to // transfer ownership of the local queue and the parked signal to the new @@ -536,7 +537,11 @@ private final class WorkerThread( // With another `WorkerThread` started, it is time to execute the blocking // action. - thunk + val result = thunk + + pool.blockedWorkerThreadCounterForwarder.decrementAndGet() + + result } } From 63469a906c8c30991c6e92661ef593c6ff34a5a2 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Fri, 12 Nov 2021 23:52:29 +0100 Subject: [PATCH 6/8] Adjust package visibility of metrics implementation methods --- .../effect/unsafe/WorkStealingThreadPool.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index bffcc3e4c4..0756d6648f 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -566,7 +566,7 @@ private[effect] final class WorkStealingThreadPool( } } - def blockedWorkerThreadCounterForwarder: AtomicInteger = + private[unsafe] def blockedWorkerThreadCounterForwarder: AtomicInteger = blockedWorkerThreadCounter /* @@ -584,7 +584,7 @@ private[effect] final class WorkStealingThreadPool( * @return * the number of worker threads backing the compute pool */ - def getWorkerThreadCount(): Int = + private[unsafe] def getWorkerThreadCount(): Int = threadCount /** @@ -594,7 +594,7 @@ private[effect] final class WorkStealingThreadPool( * @return * the number of active worker threads */ - def getActiveThreadCount(): Int = { + private[unsafe] def getActiveThreadCount(): Int = { val st = state.get() (st & UnparkMask) >>> UnparkShift } @@ -606,7 +606,7 @@ private[effect] final class WorkStealingThreadPool( * @return * the number of worker threads searching for work */ - def getSearchingThreadCount(): Int = { + private[unsafe] def getSearchingThreadCount(): Int = { val st = state.get() st & SearchMask } @@ -618,7 +618,7 @@ private[effect] final class WorkStealingThreadPool( * @return * the number of blocked worker threads */ - def getBlockedWorkerThreadCount(): Int = + private[unsafe] def getBlockedWorkerThreadCount(): Int = blockedWorkerThreadCounter.get() /** @@ -627,7 +627,7 @@ private[effect] final class WorkStealingThreadPool( * @return * the total number of fibers enqueued on all local queues */ - def getLocalQueueFiberCount(): Long = + private[unsafe] def getLocalQueueFiberCount(): Long = localQueues.map(_.size().toLong).sum /** @@ -640,6 +640,6 @@ private[effect] final class WorkStealingThreadPool( * @return * the number of asynchronously suspended fibers */ - def getSuspendedFiberCount(): Long = + private[unsafe] def getSuspendedFiberCount(): Long = workerThreads.map(_.getSuspendedFiberCount().toLong).sum } From b8367ebaac3a884b0d3257541c9a518ed1a853b5 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Fri, 12 Nov 2021 23:55:27 +0100 Subject: [PATCH 7/8] Implement the sampler class --- .../unsafe/metrics/ComputePoolSampler.scala | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala new file mode 100644 index 0000000000..d015150b62 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2020-2021 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe +package metrics + +/** + * An implementation of the [[ComputePoolSamplerMBean]] interface which simply delegates to the + * corresponding methods of the [[cats.effect.unsafe.WorkStealingThreadPool]] being monitored. + * + * @param queue + * the monitored local queue + */ +private[unsafe] class ComputePoolSampler(compute: WorkStealingThreadPool) + extends ComputePoolSamplerMBean { + def getWorkerThreadCount(): Int = compute.getWorkerThreadCount() + def getActiveThreadCount(): Int = compute.getActiveThreadCount() + def getSearchingThreadCount(): Int = compute.getSearchingThreadCount() + def getBlockedWorkerThreadCount(): Int = compute.getBlockedWorkerThreadCount() + def getLocalQueueFiberCount(): Long = compute.getLocalQueueFiberCount() + def getSuspendedFiberCount(): Long = compute.getSuspendedFiberCount() +} From 2a73aed0b91c17f1992de51ff53f30af52067631 Mon Sep 17 00:00:00 2001 From: Vasil Vasilev Date: Sun, 14 Nov 2021 00:18:37 +0100 Subject: [PATCH 8/8] Make the MBean implementation classes final --- .../scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala | 2 +- .../scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala index d015150b62..d30d87c624 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/ComputePoolSampler.scala @@ -24,7 +24,7 @@ package metrics * @param queue * the monitored local queue */ -private[unsafe] class ComputePoolSampler(compute: WorkStealingThreadPool) +private[unsafe] final class ComputePoolSampler(compute: WorkStealingThreadPool) extends ComputePoolSamplerMBean { def getWorkerThreadCount(): Int = compute.getWorkerThreadCount() def getActiveThreadCount(): Int = compute.getActiveThreadCount() diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala index 19985ea9c2..3abc9eb579 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/LocalQueueSampler.scala @@ -24,7 +24,8 @@ package metrics * @param queue * the monitored local queue */ -private[unsafe] class LocalQueueSampler(queue: LocalQueue) extends LocalQueueSamplerMBean { +private[unsafe] final class LocalQueueSampler(queue: LocalQueue) + extends LocalQueueSamplerMBean { def getFiberCount(): Int = queue.getFiberCount() def getHeadIndex(): Int = queue.getHeadIndex() def getTailIndex(): Int = queue.getTailIndex()