Skip to content

Commit

Permalink
Enqueue all fibers from evloop at once
Browse files Browse the repository at this point in the history
Fixes a bunch of issues caused by evloop#run enqueueing fibers while
we're running the evloop, which led to many enqueues for ST that could
be replaced with a single bulk enqueue (simpler, faster) as well as
properly handling trying to wake up a thread for MT (and not try to
interrupt the currently running evloop, or no wakeup because a spinning
thread is running the evloop, ...)
  • Loading branch information
ysbaddaden committed Oct 3, 2024
1 parent d0bad52 commit 1955ad2
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 46 deletions.
10 changes: 10 additions & 0 deletions src/core_ext/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ abstract class Crystal::EventLoop
end
end

{% if Crystal.has_constant?(:Evented) %}
abstract class Crystal::Evented::EventLoop
@[AlwaysInline]
def run(runnables : ExecutionContext::Queue*, blocking : Bool) : Bool
system_run(blocking) { |fiber| runnables.value.push(fiber) }
true
end
end
{% end %}

{% if flag?(:unix) && flag?(:evloop_libevent) %}
class Crystal::LibEvent::EventLoop
# Create a new resume event for a fiber.
Expand Down
23 changes: 19 additions & 4 deletions src/isolated.cr
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,36 @@ module ExecutionContext
# wait on the evloop
@waiting.set(true, :release)

queue = Queue.new

loop do
# check parallel enqueue first (on second iteration checks if evloop
# enqueued)
# check for parallel enqueue
if @enqueued.get(:acquire)
@waiting.set(false, :release)
@enqueued.set(false, :relaxed)
@enqueued.set(false, :release)
Crystal.trace :sched, "resume"
return
end

unless @event_loop.run(blocking: true)
if @event_loop.run(pointerof(queue), blocking: true)
if fiber = queue.pop?
unless fiber == @main_fiber && queue.empty?
raise RuntimeError.new("ERROR: concurrency is disabled in isolated contexts")
end
@waiting.set(false, :release)
@enqueued.set(false, :release)
Crystal.trace :sched, "resume"
return
else
# the evloop got interrupted: restart
next
end
else
# evloop doesn't wait when empty (e.g. libevent)
break
end
end
@waiting.set(false, :relaxed)

# empty evloop: park the thread
@mutex.synchronize do
Expand Down
34 changes: 19 additions & 15 deletions src/multi_threaded.cr
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ module ExecutionContext

@condition.wait(@mutex)

@parked.sub(1, :acquire_release)
# we don't decrement @parked because #wake_scheduler did
Crystal.trace :sched, "wakeup"
end

Expand All @@ -183,15 +183,14 @@ module ExecutionContext
# parking when there is a parallel cross context enqueue!
protected def wake_scheduler : Nil
# another thread is spinning: nothing to do (it shall notice the enqueue)
return if @spinning.get(:acquire) > 0
if @spinning.get(:relaxed) > 0
return
end

# try to interrupt a thread waiting on the event loop
#
# FIXME: don't interrupt ourselves (i.e. current thread is running the
# eventloop...)
#
# OPTIMIZE: with one eventloop per execution context, we might prefer to
# wakeup a parked thread *before* interrupting the event loop.
# wakeup a parked thread *before* interrupting the event loop?
if @event_loop_lock.get(:relaxed)
@event_loop.interrupt
return
Expand All @@ -206,14 +205,20 @@ module ExecutionContext
# the current thread
if @parked.get(:acquire) > 0
@mutex.synchronize do
# OPTIMIZE: relaxed atomics should be enough (we're inside a mutex
# that shall already deal with memory order
# OPTIMIZE: relaxed atomic should be enough
return if @parked.get(:acquire) == 0
return if @spinning.get(:acquire) > 0

# OPTIMIZE: don't unpark if there are enough spinning threads
return if @spinning.get(:relaxed) > 0

# increase the number of spinning threads _now_ to avoid multiple
# threads from trying to wakeup multiple threads at the same time
@spinning.add(1, :acquire_release)
#
# we must also decrement the number of parked threads because another
# thread could lock the mutex and increment @spinning again before the
# signaled thread is resumed (oops)
spinning = @spinning.add(1, :acquire_release)
parked = @parked.sub(1, :acquire_release)

# wakeup a thread
@condition.signal
Expand All @@ -222,8 +227,8 @@ module ExecutionContext
end

# shall we start another thread?
# no need for atomics, the values shall be rather stable ovber time and
# we check them again inside the mutex.
# no need for atomics, the values shall be rather stable over time and we
# check them again inside the mutex.
return if @threads.size == @size.end

@mutex.synchronize do
Expand All @@ -234,11 +239,10 @@ module ExecutionContext
end
end

protected def lock_evloop? : Bool
protected def lock_evloop?(& : -> Bool) : Bool
if @event_loop_lock.swap(true, :acquire) == false
begin
yield @event_loop
true
yield
ensure
@event_loop_lock.set(false, :release)
end
Expand Down
59 changes: 41 additions & 18 deletions src/multi_threaded/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ module ExecutionContext
@execution_context.wake_scheduler unless @execution_context.capacity == 1
end

# Enqueue a list of fibers in a single operation and returns a fiber to
# resume immediately.
#
# This is called after running the event loop for example.
private def enqueue(queue : Queue*) : Fiber?
if fiber = queue.value.pop?
Crystal.trace :sched, "enqueue", size: queue.value.size, fiber: fiber
unless queue.value.empty?
@runnables.bulk_push(queue)
@execution_context.wake_scheduler
end
fiber
end
end

protected def reschedule : Nil
Crystal.trace :sched, "reschedule"
if fiber = quick_dequeue?
Expand Down Expand Up @@ -100,6 +115,7 @@ module ExecutionContext
private def try_steal? : Fiber?
@execution_context.steal do |other|
if other == self
# no need to steal from ourselves
next
end

Expand Down Expand Up @@ -142,19 +158,22 @@ module ExecutionContext
end

private def find_next_runnable(&) : Nil
queue = Queue.new

# nothing to do: start spinning
spinning do
if @execution_context.lock_evloop? { @event_loop.run(blocking: false) }
yield @runnables.get?
end

yield @global_queue.grab?(@runnables, divisor: @execution_context.size)

if @execution_context.lock_evloop? { @event_loop.run(pointerof(queue), blocking: false) }
spin_stop
yield enqueue(pointerof(queue))
end

yield try_steal?
end

# wait on the event loop for events and timers to activate
@execution_context.lock_evloop? do
result = @execution_context.lock_evloop? do
@waiting = true

# there is a time window between stop spinning and start waiting
Expand All @@ -163,22 +182,29 @@ module ExecutionContext
# which may block for a long time:
yield @global_queue.grab?(@runnables, divisor: @execution_context.size)

if @event_loop.run(blocking: true)
# the event loop enqueud a fiber or was interrupted: restart
return
end
# block on the event loop until an event is ready or the loop is
# interrupted
@event_loop.run(pointerof(queue), blocking: true)
ensure
@waiting = false
end

if result
yield enqueue(pointerof(queue))

# the event loop was interrupted: restart
return
else
# evloop doesn't wait when empty (e.g. libevent)
end

# no runnable fiber, no event in the event loop or another thread is
# already running the event loop: go into deep sleep until another
# scheduler or another context enqueues a fiber
# already running it: go into deep sleep until another scheduler or
# another context enqueues a fiber
@execution_context.park_thread do
# by the time we acquired the lock, another thread may have
# enqueued fiber(s) and already tried to wakeup a thread (race).
# we don't check the scheduler's local queue nor its event loop
# (both are empty)
# by the time we acquired the lock, another thread may have enqueued
# fiber(s) and already tried to wakeup a thread (race). we don't check
# the scheduler's local queue nor its event loop (both are empty)
yield @global_queue.unsafe_grab?(@runnables, divisor: @execution_context.size)

# OPTIMIZE: may hold the lock for a while (increasing with threads)
Expand All @@ -197,9 +223,6 @@ module ExecutionContext

# OPTIMIZE: skip spinning if spinning >= running/2
private def spinning(&)
# we could avoid spinning with MT:1 but another context could try to
# enqueue... maybe keep a counter of execution contexts?
# return if @execution_context.size == 1
spin_start

4.times do |iter|
Expand Down
30 changes: 21 additions & 9 deletions src/single_threaded.cr
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ module ExecutionContext
end
end

private def enqueue(queue : Queue*) : Fiber?
if fiber = queue.value.pop?
@runnables.bulk_push(queue) unless queue.value.empty?
fiber
end
end

protected def reschedule : Nil
Crystal.trace :sched, "reschedule"
if fiber = quick_dequeue?
Expand Down Expand Up @@ -151,10 +158,10 @@ module ExecutionContext
end

# run the event loop to see if any event is activable
if @event_loop.run(blocking: false)
if fiber = @runnables.get?
return fiber
end
queue = Queue.new

if @event_loop.run(pointerof(queue), blocking: false)
return enqueue(pointerof(queue))
end
end

Expand Down Expand Up @@ -190,12 +197,14 @@ module ExecutionContext
end

private def find_next_runnable(&) : Nil
queue = Queue.new

# nothing to do: start spinning
spinning do
yield @global_queue.grab?(@runnables, divisor: 1)

if @event_loop.run(blocking: false)
yield @runnables.get?
if @event_loop.run(pointerof(queue), blocking: false)
yield enqueue(pointerof(queue))
end
end

Expand All @@ -207,9 +216,13 @@ module ExecutionContext
# block for a long time):
yield @global_queue.grab?(@runnables, divisor: 1)

if @event_loop.run(blocking: true)
# the event loop enqueued a fiber or was interrupted: restart
if @event_loop.run(pointerof(queue), blocking: true)
yield enqueue(pointerof(queue))

# the event loop was interrupted: restart
return
else
# the event loop doesn't wait when empty: go to park thread
end
end

Expand Down Expand Up @@ -288,7 +301,6 @@ module ExecutionContext
# queue to try and wakeup the ST thread running in parallel that may be
# running, spinning, waiting or parked.
private def wake_scheduler : Nil
# return unless @idle
return if @spinning.get(:acquire)

if @waiting.get(:acquire)
Expand Down

0 comments on commit 1955ad2

Please sign in to comment.