Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute pool metrics #2534

Merged
merged 8 commits into from
Nov 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ private[effect] final class WorkStealingThreadPool(
*/
private[this] val done: AtomicBoolean = new AtomicBoolean(false)

private[this] val blockedWorkerThreadCounter: AtomicInteger = new AtomicInteger(0)

// Thread pool initialization block.
{
// Set up the worker threads.
Expand Down Expand Up @@ -563,4 +565,81 @@ private[effect] final class WorkStealingThreadPool(
Thread.currentThread().interrupt()
}
}

private[unsafe] def blockedWorkerThreadCounterForwarder: AtomicInteger =
blockedWorkerThreadCounter
Comment on lines +569 to +570
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pedagogical: why keep this behind a forwarder?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this BS:

class Stupid {
  val blah = 5
}

compiles to the following bytecode:

public class cats.effect.Stupid {
  private final int blah;

  private volatile boolean bitmap$init$0;

  public int blah();
    Code:
       0: aload_0
       1: getfield      #15                 // Field bitmap$init$0:Z
       4: ifeq          14
       7: aload_0
       8: getfield      #17                 // Field blah:I
      11: goto          24
      14: new           #19                 // class scala/UninitializedFieldError
      17: dup
      18: ldc           #21                 // String Uninitialized field: /Users/vasil/Code/cats-effect/core/shared/src/main/scala/cats/effect/Stupid.scala: 20
      20: invokespecial #25                 // Method scala/UninitializedFieldError."<init>":(Ljava/lang/String;)V
      23: athrow
      24: pop
      25: aload_0
      26: getfield      #17                 // Field blah:I
      29: ireturn

  public cats.effect.Stupid();
    Code:
       0: aload_0
       1: invokespecial #30                 // Method java/lang/Object."<init>":()V
       4: aload_0
       5: iconst_5
       6: putfield      #17                 // Field blah:I
       9: aload_0
      10: iconst_1
      11: putfield      #15                 // Field bitmap$init$0:Z
      14: return
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blah() getter is synchronized on the bitmap field, which is volatile, incurring unnecessary synchronization points.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I thought this happened even for private[this] fields which is why you moved everything to the constructor?

Copy link
Member Author

@vasilmkd vasilmkd Nov 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does in the constructor. But for private[this] fields, the getter is not automatically generated by the compiler. A "forwarder" method accesses the field raw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class Forwarder {
  private[this] val blah: Int = 5

  def blahForwarder: Int = blah
}

compiles to

public class cats.effect.Forwarder {
  private final int blah;

  private volatile boolean bitmap$init$0;

  public int blahForwarder();
    Code:
       0: aload_0
       1: getfield      #16                 // Field blah:I
       4: ireturn

  public cats.effect.Forwarder();
    Code:
       0: aload_0
       1: invokespecial #22                 // Method java/lang/Object."<init>":()V
       4: aload_0
       5: iconst_5
       6: putfield      #16                 // Field blah:I
       9: aload_0
      10: iconst_1
      11: putfield      #24                 // Field bitmap$init$0:Z
      14: return
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forwarder is a Cats Effect nomenclature. I think I heard Daniel say it once, or use it, and I continued doing so. It doesn't have any meaning. It's the same as getter, the point is to write it ourselves and circumvent the synchronization that the Scala compiler would do otherwise. 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 🤓 I'm very glad there's a workaround! But I wish I understood why the Scala compiler does this.


/*
* What follows is a collection of methos used in the implementation of the
* `cats.effect.unsafe.metrics.ComputePoolSamplerMBean` interface.
*/

/**
* Returns the number of [[WorkerThread]] instances backing the [[WorkStealingThreadPool]].
*
* @note
* This is a fixed value, as the [[WorkStealingThreadPool]] has a fixed number of worker
* threads.
*
* @return
* the number of worker threads backing the compute pool
*/
private[unsafe] def getWorkerThreadCount(): Int =
threadCount

/**
* Returns the number of active [[WorkerThread]] instances currently executing fibers on the
* compute thread pool.
*
* @return
* the number of active worker threads
*/
private[unsafe] def getActiveThreadCount(): Int = {
val st = state.get()
(st & UnparkMask) >>> UnparkShift
}

/**
* Returns the number of [[WorkerThread]] instances currently searching for fibers to steal
* from other worker threads.
*
* @return
* the number of worker threads searching for work
*/
private[unsafe] def getSearchingThreadCount(): Int = {
val st = state.get()
st & SearchMask
}

/**
* Returns the number of [[WorkerThread]] instances which are currently blocked due to running
* blocking actions on the compute thread pool.
*
* @return
* the number of blocked worker threads
*/
private[unsafe] def getBlockedWorkerThreadCount(): Int =
blockedWorkerThreadCounter.get()

/**
* Returns the total number of fibers enqueued on all local queues.
*
* @return
* the total number of fibers enqueued on all local queues
*/
private[unsafe] def getLocalQueueFiberCount(): Long =
localQueues.map(_.size().toLong).sum

/**
* Returns the number of fibers which are currently asynchronously suspended.
*
* @note
* This counter is not synchronized due to performance reasons and might be reporting
* out-of-date numbers.
*
* @return
* the number of asynchronously suspended fibers
*/
private[unsafe] def getSuspendedFiberCount(): Long =
workerThreads.map(_.getSuspendedFiberCount().toLong).sum
}
21 changes: 20 additions & 1 deletion core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ private final class WorkerThread(
// this worker thread has run its course and it is time to die, after the
// blocking code has been successfully executed.
blocking = true
pool.blockedWorkerThreadCounterForwarder.incrementAndGet()

// Spawn a new `WorkerThread`, a literal clone of this one. It is safe to
// transfer ownership of the local queue and the parked signal to the new
Expand All @@ -536,7 +537,25 @@ private final class WorkerThread(

// With another `WorkerThread` started, it is time to execute the blocking
// action.
thunk
val result = thunk

pool.blockedWorkerThreadCounterForwarder.decrementAndGet()

result
}
}

/**
* Returns the number of fibers which are currently asynchronously suspended and tracked by
* this worker thread.
*
* @note
* This counter is not synchronized due to performance reasons and might be reporting
* out-of-date numbers.
*
* @return
* the number of asynchronously suspended fibers
*/
def getSuspendedFiberCount(): Int =
fiberBag.size()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2020-2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe
package metrics

/**
* An implementation of the [[ComputePoolSamplerMBean]] interface which simply delegates to the
* corresponding methods of the [[cats.effect.unsafe.WorkStealingThreadPool]] being monitored.
*
* @param queue
* the monitored local queue
*/
private[unsafe] final class ComputePoolSampler(compute: WorkStealingThreadPool)
extends ComputePoolSamplerMBean {
def getWorkerThreadCount(): Int = compute.getWorkerThreadCount()
def getActiveThreadCount(): Int = compute.getActiveThreadCount()
def getSearchingThreadCount(): Int = compute.getSearchingThreadCount()
def getBlockedWorkerThreadCount(): Int = compute.getBlockedWorkerThreadCount()
def getLocalQueueFiberCount(): Long = compute.getLocalQueueFiberCount()
def getSuspendedFiberCount(): Long = compute.getSuspendedFiberCount()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020-2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe.metrics

/**
* An MBean interface for monitoring a [[WorkStealingThreadPool]] backed compute thread pool.
*/
private[unsafe] trait ComputePoolSamplerMBean {

/**
* Returns the number of [[WorkerThread]] instances backing the [[WorkStealingThreadPool]].
*
* @note
* This is a fixed value, as the [[WorkStealingThreadPool]] has a fixed number of worker
* threads.
*
* @return
* the number of worker threads backing the compute pool
*/
def getWorkerThreadCount(): Int

/**
* Returns the number of active [[WorkerThread]] instances currently executing fibers on the
* compute thread pool.
*
* @return
* the number of active worker threads
*/
def getActiveThreadCount(): Int

/**
* Returns the number of [[WorkerThread]] instances currently searching for fibers to steal
* from other worker threads.
*
* @return
* the number of worker threads searching for work
*/
def getSearchingThreadCount(): Int

/**
* Returns the number of [[WorkerThread]] instances which are currently blocked due to running
* blocking actions on the compute thread pool.
*
* @return
* the number of blocked worker threads
*/
def getBlockedWorkerThreadCount(): Int

/**
* Returns the total number of fibers enqueued on all local queues.
*
* @return
* the total number of fibers enqueued on all local queues
*/
def getLocalQueueFiberCount(): Long

/**
* Returns the number of fibers which are currently asynchronously suspended.
*
* @return
* the number of asynchronously suspended fibers
*/
def getSuspendedFiberCount(): Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ package metrics
* @param queue
* the monitored local queue
*/
private[unsafe] class LocalQueueSampler(queue: LocalQueue) extends LocalQueueSamplerMBean {
private[unsafe] final class LocalQueueSampler(queue: LocalQueue)
extends LocalQueueSamplerMBean {
def getFiberCount(): Int = queue.getFiberCount()
def getHeadIndex(): Int = queue.getHeadIndex()
def getTailIndex(): Int = queue.getTailIndex()
Expand Down